Reports


Interacting with reports and report definitions allows you to define a custom data delivery report that provides data processing metrics for a given job or topology in Control Hub.

Creating a Report Definition

A report definition can be built and added to Control Hub using the streamsets.sdk.sch_models.ReportDefinitionBuilder class. Use the streamsets.sdk.ControlHub.get_report_definition_builder() method to instantiate the builder object:

report_definition_builder = sch.get_report_definition_builder()
# Set the report generation time frame for last 30 minutes.
report_definition_builder.set_data_retrieval_period(start_time='${time:now() - 30 * MINUTES}', end_time='${time:now()}')

# Add resources to the Report.
job = sch.jobs.get(job_name='name')
topology = sch.topologies.get(topology_name='name')
report_definition_builder.add_report_resource(job)
report_definition_builder.add_report_resource(topology)

# Build and publish.
report_definition = report_definition_builder.build(name='from sdk')
sch.add_report_definition(report_definition)

Creating Report Definitions using absolute time range

You can also create a report definition for a fixed, absolute time range using the same arguments and methods as above. Simply specify the timestamp of both start_time and end_time in milliseconds:

import datetime
start_time = datetime.datetime(2019, 4, 1).timestamp() * 1000
end_time = datetime.datetime(2019, 4, 10).timestamp() * 1000
report_definition_builder.set_data_retrieval_period(start_time=start_time, end_time=end_time)

Generating a Report

Once you have a report definition created for a particular job and/or topology, you can then trigger the generation of a data delivery report for that definition by using the streamsets.sdk.sch_models.ReportDefinition.generate_report() method:

report_defintion = sch.report_definitions.get(name='from sdk')
report_command = report_defintion.generate_report()

report_command.report

# After the report is generated
report_command.report

Output:

# report_command.report
Report is still being generated...

# report_command.report
<Report (id=13114c45-15ce-44d1-8ff5-bc5ba73f5b8a:admin, name=from sdk at 04-12-2019 18:38:00 UTC)>

Getting existing Report Definitions and Reports

It is also possible to retrieve existing report definitions and their corresponding reports. Simply reference the report_definitions attribute of your streamsets.sdk.ControlHub instance to get a list of all streamsets.sdk.sch_models.ReportDefinition objects:

sch.report_definitions

Output:

[<ReportDefinition (id=c8982001-41f3-4581-8fb0-dcabc5fd7115:admin, name=Report for test job)>,
 <ReportDefinition (id=8cca181f-b9a2-4489-b493-accf128e9901:admin, name=Report for test topology)>,
 <ReportDefinition (id=4c7dccf1-30a8-4b81-9463-7723e0697d62:admin, name=from sdk)>]

You can also further filter and refine which report definition you’re interested in with attributes like name or id:

# Get Report Definitions
sch.report_definitions.get(name='from sdk')

Output:

<ReportDefinition (id=4c7dccf1-30a8-4b81-9463-7723e0697d62:admin, name=from sdk)>

Once you have obtained the report definition object you’re interested in, you can view additional data associated with that definition - such as the streamsets.sdk.sch_models.ReportDefinition.report_resources attributed to it, or the streamsets.sdk.sch_models.ReportDefinition.reports the definition has already generated:

# Get Report Resources
sch.report_definitions.get(name='from sdk').report_resources
[<ReportResource (resource_type=JOB, resource_id=fa9517c8-c93d-432e-b880-9c2d2d1c5dfe:admin)>,
 <ReportResource (resource_type=TOPOLOGY, resource_id=b124dedf-cbc9-4632-a765-8fc59b9636ab:admin)>]

# Get Reports
sch.report_definitions.get(name='from sdk').reports

# These properties can also be referenced directly from the object itself
report_definition = sch.report_definitions.get(name='from sdk')
report_definition.report_resources
report_definition.reports

Output:

# sch.report_definitions.get(name='from sdk').report_resources
[<ReportResource (resource_type=JOB, resource_id=fa9517c8-c93d-432e-b880-9c2d2d1c5dfe:admin)>,
 <ReportResource (resource_type=TOPOLOGY, resource_id=b124dedf-cbc9-4632-a765-8fc59b9636ab:admin)>]

# sch.report_definitions.get(name='from sdk').reports
[<Report (id=13114c45-15ce-44d1-8ff5-bc5ba73f5b8a:admin, name=from sdk at 04-12-2019 18:38:00 UTC)>,
 <Report (id=663490aa-b413-460d-8b0d-38b52592cfb2:admin, name=from sdk at 04-12-2019 18:31:00 UTC)>]

# report_definition.report_resources
[<ReportResource (resource_type=JOB, resource_id=fa9517c8-c93d-432e-b880-9c2d2d1c5dfe:admin)>,
 <ReportResource (resource_type=TOPOLOGY, resource_id=b124dedf-cbc9-4632-a765-8fc59b9636ab:admin)>]

# report_definition.reports
[<Report (id=13114c45-15ce-44d1-8ff5-bc5ba73f5b8a:admin, name=from sdk at 04-12-2019 18:38:00 UTC)>,
 <Report (id=663490aa-b413-460d-8b0d-38b52592cfb2:admin, name=from sdk at 04-12-2019 18:31:00 UTC)>]

Downloading existing Reports as PDF

Reports generated by a report definition are stored in PDF format, and can be downloaded and modified as needed. Simply obtain the report definition you’re interested in, identify which report you wish to download, and then use the streamsets.sdk.sch_models.Report.download() method:

report_defintion = sch.report_definitions.get(name='from sdk')
# Show the reports in the report definition
report_definition.reports

# Download the report, store it in report_content
report_content = report_defintion.reports[0].download()

# Write the report's contents to a file
with open('report.pdf', 'wb') as f:
    f.write(report_content)

Output:

# report_definition.reports
[<Report (id=13114c45-15ce-44d1-8ff5-bc5ba73f5b8a:admin, name=from sdk at 04-12-2019 18:38:00 UTC)>,
 <Report (id=663490aa-b413-460d-8b0d-38b52592cfb2:admin, name=from sdk at 04-12-2019 18:31:00 UTC)>]

Updating an existing Report Definition

Updating an existing report definition is similar to creating a new report definition for the first time. It makes use of the streamsets.sdk.sch_models.ReportDefinitionBuilder class to import the existing report definition object first, which then allows the report definition to be modified. Once the definition has been modified as desired, the streamsets.sdk.sch_models.ReportDefinitionBuilder.build() method is used to construct the streamsets.sdk.sch_models.ReportDefinition which can then be passed to Control Hub via the streamsets.sdk.ControlHub.update_report_definition() method:

report_definition_builder = sch.get_report_definition_builder()
report_definition = sch.report_definitions.get(name='from sdk')

# Import Report Definition into Report Definition Builder.
report_definition_builder.import_report_definition(report_definition)

# Remove topology from resources
topology = sch.topologies.get(topology_id='topology_id=2c8a398c-775f-45cf-a338-5425c47b7084:admin')
report_definition_builder.remove_report_resource(topology)

# Add job to resources
job = sch.jobs.get(job_name='another job')
report_definition_builder.add_report_resource(job)

# Update time range from last 30 minutes to last 2 days
report_definition_builder.set_data_retrieval_period(start_time='${time:now() - 2 * DAYS}', end_time='${time:now()}')
sch.update_report_definition(report_defintion)

Scheduling Report generation

Reports can also be generated at a set internal for a particular report definition. Periodic report generation is handled as a scheduled task, and requires a cron expression to be specified for the interval. To schedule periodic report generation, retrieve the streamsets.sdk.sch_models.ReportDefinition object you wish to schedule generation for and pass it into the streamsets.sdk.sch_models.ScheduledTaskBuilder.build() method:

# Get the report definition to be scheduled
report_def = sch.report_definitions.get(name='from sdk')

# Instantiate a ScheduledTaskBuilder, and build the scheduled task with the report_def (from above)
# as the task_object
task = sch.get_scheduled_task_builder().build(task_object=report_def,
                                              action='START',
                                              name='Task for Report {}'.format(report_def.name),
                                              cron_expression='0/1 * 1/1 * ? *',
                                              time_zone='UTC')

# Publish the scheduled task (built above) to Control Hub
sch.add_scheduled_task(task)