Jobs


A job defines the pipeline to run and the execution engine that runs the pipeline. The SDK enables you to interact with jobs on a Control Hub instance including creating a job, deleting a job, starting a job, stopping a job, retrieving a job’s metrics, and more.

Creating a Job

To create a new streamsets.sdk.sch_models.Job object and add it to Control Hub, use the streamsets.sdk.sch_models.JobBuilder class. Use the streamsets.sdk.ControlHub.get_job_builder() method to instantiate the builder object:

job_builder = sch.get_job_builder()

Next, retrieve the streamsets.sdk.sch_models.Pipeline object that you wish to create to the job for, pass it to the streamsets.sdk.sch_models.JobBuilder.build() method, and pass the resulting job object to the streamsets.sdk.ControlHub.add_job() method:

pipeline = sch.pipelines.get(name='Test Pipeline')
job = job_builder.build('job name', pipeline=pipeline)
sch.add_job(job)

Creating a Job with a particular pipeline version

To create a job with a specific version of a pipeline, you can pass a specific pipeline commit to the streamsets.sdk.sch_models.JobBuilder.build() method. Simply retrieve the desired streamsets.sdk.sch_models.PipelineCommit instance from the pipeline you’re creating the job for:

job_builder = sch.get_job_builder()
pipeline = sch.pipelines.get(name='Test Pipeline')
pipeline_commit = pipeline.commits.get(version='1')
job = job_builder.build('job name', pipeline=pipeline, pipeline_commit=pipeline_commit)
sch.add_job(job)

Upgrading a Job

When a job uses a pipeline that gets updated, it is necessary to upgrade the job to make sure the latest version of the pipeline is being used in the job. To upgrade one or more jobs to the corresponding latest pipeline version, you can use the streamsets.sdk.ControlHub.upgrade_job() method:

# Get all job instances that use the pipeline version tagged with the 'v1' commit label
jobs = sch.jobs.get_all(pipeline_commit_label='v1')
sch.upgrade_job(*jobs)

Updating a Job with a different pipeline version

A job can also be updated to use an entirely different pipeline or pipeline version. Set the streamsets.sdk.sch_models.Job.commit attribute of the streamsets.sdk.sch_models.Job instance to point to the new pipeline commit, and then pass the updated streamsets.sdk.sch_models.Job instance to the streamsets.sdk.ControlHub.update_job() method:

job = sch.jobs.get(pipeline_commit_label='v2')
pipeline = sch.pipelines.get(name='Test Pipeline')
pipeline_commit = pipeline.commits.get(version='1')
job.commit = pipeline_commit
sch.update_job(job)

Duplicating a Job

The SDK also allows for explicitly duplicating an existing job in Control Hub. Simply retrieve the streamsets.sdk.sch_models.Job instance that you want to duplicate, and pass it to the streamsets.sdk.ControlHub.duplicate_job() method along with the number of copies to create:

job = sch.jobs.get(job_id='6889df89-7aaa-4e10-9f26-bdf16af4c0db:admin')
sch.duplicate_job(job, number_of_copies=2)

Output:

[<Job (job_id=e52c4157-2aec-4b7c-b875-8244d5dc220b:admin, job_name=Job for dev copy1)>,
 <Job (job_id=c0307b6e-2eee-44e3-b8b1-9600e25a30b7:admin, job_name=Job for dev copy2)>]

Importing Jobs

Jobs can also be imported directly in the SDK. To import one or more jobs from a compressed archive, you can use the streamsets.sdk.ControlHub.import_jobs() method, passing in the compressed archive to the method. This will return a streamsets.sdk.utils.SeekableList of the streamsets.sdk.sch_models.Job objects that were imported:

# Open a compressed archive for reading, and then pass it into the import_jobs method
with open('jobs.zip', 'rb') as jobs_file:
    jobs = sch.import_jobs(archive=jobs_file)

Exporting Jobs

Similarly, jobs can also be exported from Control Hub directly in the SDK. To export one or more jobs to a compressed archive, use the streamsets.sdk.ControlHub.export_jobs() method after retrieving the streamsets.sdk.sch_models.Job object(s) you wish to export:

# Retrieve the Job objects to export - all jobs on Control Hub, in this example
jobs = sch.jobs
jobs_file_data = sch.export_jobs(jobs)

# Open an archive file for writing, and write out the exported job data
with open('jobs.zip', 'wb') as jobs_file:
    jobs_file.write(jobs_file_data)

Resetting offsets

Jobs maintain offsets to keep track of the most-recently processed data before the job was stopped. It is sometimes desirable, or necessary, to reset the offset of a particular job. To reset offsets for one or more jobs, use the streamsets.sdk.ControlHub.reset_origin() method after retrieving the streamsets.sdk.sch_models.Job instance(s) you wish to reset:

# Get all jobs available from Control Hub, then reset each of their origins
jobs = sch.jobs
sch.reset_origin(*jobs)

Retrieving Offsets

A job’s current offsets can also be retrieved via the SDK. To retrieve the current offsets of a job, reference the streamsets.sdk.sch_models.JobStatus.offsets attribute of the job’s streamsets.sdk.ControlHub.Job.current_status. This will return a streamsets.sdk.sch_models.JobOffset object

job = sch.job.get(job_name='job name')
job.current_status.offsets

Output:

[<JobOffset (sdc_id=0501dc93-8634-11e9-99f3-97919257db3c, pipeline_id=896197a7-9639-4575-9784-260f1dc46fbc:admin)>]

To retrieve offsets from a particular job run, you can reference the streamsets.sdk.sch_models.JobStatus.offsets attribute of a job’s streamsets.sdk.ControlHub.Job.history object:

# Get the most recent run (JobStatus object) from the job's history
job_status = job.history[0]
job_status.offsets

Output:

[<JobOffset (sdc_id=0501dc93-8634-11e9-99f3-97919257db3c, pipeline_id=896197a7-9639-4575-9784-260f1dc46fbc:admin)>]

Uploading offsets

It’s also possible to upload a job’s offset. For example, if you create a second job that reads the same origin data as an existing job, but you want to ensure both to start with the latest offset, you could upload the offset to the newly-created job. To upload offsets for a job use the streamsets.sdk.ControlHub.upload_offset() method:

job = sch.jobs.get(job_name='job name')

with open('offset.json') as offset_file:
    sch.upload_offset(job, offset_file=offset_file)

The streamsets.sdk.ControlHub.upload_offset() method can also be used to upload an offset in raw JSON format:

offset_json = {"version" : 2,
               "offsets" : {"$com.streamsets.datacollector.pollsource.offset$" : None}}
sch.upload_offset(job, offset_json=offset_json)

Retrieving Job Status History

Retrieving a given job’s history can also be done from the SDK. Simply retrieve the streamsets.sdk.sch_models.Job instance in question from Control Hub and reference its streamsets.sdk.sch_models.Job.history attribute. This will show the execution history for the job all contained within a streamsets.sdk.sch_models.JobStatus object:

job = sch.jobs[0]
job.history

Output:

[<JobStatus (status=INACTIVE, start_time=1585923912290, finish_time=1585923935759, run_count=2)>,
 <JobStatus (status=INACTIVE, start_time=1585923875846, finish_time=1585923897766, run_count=1)>]

Retrieving Run Events from Job History

You can introspect on an individual streamsets.sdk.sch_models.JobStatus object within a job to see the run events for it. The run events correspond to the events that occurred during that execution, like the job activating or deactivating:

# Get the most recent run (JobStatus object) from the job's history
job_status = job.history[0]
job_status.run_history

Output:

[<JobRunEvent (user=admin@admin, time=1560367534056, status=ACTIVATING)>,
 <JobRunEvent (user=admin@admin, time=1560367540929, status=DEACTIVATING)>,
 <JobRunEvent (user=None, time=1560367537771, status=DEACTIVATING)>,
 <JobRunEvent (user=None, time=1560367537814, status=DEACTIVATING)>]

Metrics

To access metrics for a job, reference the streamsets.sdk.sch_models.Job.metrics attribute of a streamsets.sdk.sch_models.Job instance. This will return a streamsets.sdk.utils.SeekableList of streamsets.sdk.sch_models.JobMetrics objects that are in reverse chronological order (newest first):

job = sch.jobs.get(job_name='job name')
job.metrics

Output:

[<JobMetrics (run_count=5, input_count=3204, output_count=3204, total_error_count=0)>,
 <JobMetrics (run_count=4, input_count=24740, output_count=24740, total_error_count=0)>,
 <JobMetrics (run_count=3, input_count=9960, output_count=9960, total_error_count=0)>,
 <JobMetrics (run_count=2, input_count=9564, output_count=9564, total_error_count=0)>,
 <JobMetrics (run_count=1, input_count=792, output_count=792, total_error_count=0)>]

We can also reference the streamsets.sdk.sch_models.Job.history attribute of a streamsets.sdk.sch_models.Job instance to figure out which job run we might be interested in. For example, if we wanted to know which job run executed at Apr 01 2021 16:39:48 GMT (unix-timestamp ‘1617295188217’) and get the metrics for it, we could use the following steps:

job.history.get(start_time=1617295188217)

Output:

<JobStatus (status=INACTIVE, start_time=1617295188217, finish_time=1617295209406, run_count=2)>

This was run_count 2, so now we know which run_count to reference for this run’s metrics

job.metrics.get(run_count=2)

Output:

<JobMetrics (run_count=2, input_count=9564, output_count=9564, total_error_count=0)>

Time Series Metrics

When time series analysis is enabled for a job, you can check the time series metrics from the SDK directly. The SDK provides a breakdown of streamsets.sdk.sch_models.JobTimeSeriesMetrics.input_records, streamsets.sdk.sch_models.JobTimeSeriesMetrics.output_records, and streamsets.sdk.sch_models.JobTimeSeriesMetrics.error_records.

To access time series metrics for a job, use the streamsets.sdk.sch_models.Job.time_series_metrics() method and pass in the metric_type you’re interested in. Available options are:

  • 'Record Count Time Series' - Total count of each category of records (input, output, error) for the given time frame.

  • 'Record Throughput Time Series' - The number of records of each category of records (input, output, error) processed, per second, for the given time frame.

  • 'Batch Throughput Time Series' - The number of record batches processed per second for the given time frame.

  • 'Stage Batch Processing Timer seconds' - The amount of time it took to process a record batch in each stage of the job’s pipeline.

# Get the number of records processed per second for a job
job_time_series_metrics = job.time_series_metrics(metric_type='Record Throughput Time Series')
job_time_series_metrics

# Drill down further to just the input_records
job_time_series_metrics.input_records

# Drill down even further and look at just the time_series metrics values of the input_records
job_time_series_metrics.input_records.time_series

Output:

# job_time_series_metrics
<JobTimeSeriesMetrics (
input_records=<JobTimeSeriesMetric (name=pipeline_batchInputRecords_meter,
                                    time_series={'2019-06-24T19:35:01.34Z': 182000.0,
                                                 '2019-06-24T19:36:03.273Z': 242000.0,
                                                 '2019-06-24T19:37:05.202Z': 303000.0,
                                                 '2019-06-24T19:38:07.135Z': 363000.0,
                                                 '2019-06-24T19:39:09.065Z': 424000.0})>,
output_records=<JobTimeSeriesMetric (name=pipeline_batchOutputRecords_meter,
                                     time_series={'2019-06-24T19:35:01.34Z': 182000.0,
                                                  '2019-06-24T19:36:03.273Z': 242000.0,
                                                  '2019-06-24T19:37:05.202Z': 303000.0,
                                                  '2019-06-24T19:38:07.135Z': 363000.0,
                                                  '2019-06-24T19:39:09.065Z': 424000.0})>,
error_records=<JobTimeSeriesMetric (name=pipeline_batchErrorRecords_meter,
                                    time_series={'2019-06-24T19:35:01.34Z': 0.0,
                                                 '2019-06-24T19:36:03.273Z': 0.0,
                                                 '2019-06-24T19:37:05.202Z': 0.0,
                                                 '2019-06-24T19:38:07.135Z': 0.0,
                                                 '2019-06-24T19:39:09.065Z': 0.0})>)>

# job_time_series_metrics.input_records
<JobTimeSeriesMetric (name=pipeline_batchInputRecords_meter, time_series={'2019-06-24T19:35:01.34Z': 182000.0,
                                                                          '2019-06-24T19:36:03.273Z': 242000.0,
                                                                          '2019-06-24T19:37:05.202Z': 303000.0,
                                                                          '2019-06-24T19:38:07.135Z': 363000.0,
                                                                          '2019-06-24T19:39:09.065Z': 424000.0})>

# job_time_series_metrics.input_records.time_series
{'2019-06-24T19:35:01.34Z': 182000.0,
 '2019-06-24T19:36:03.273Z': 242000.0,
 '2019-06-24T19:37:05.202Z': 303000.0,
 '2019-06-24T19:38:07.135Z': 363000.0,
 '2019-06-24T19:39:09.065Z': 424000.0}

By default, the streamsets.sdk.sch_models.Job.time_series_metrics() method will gather metrics for the last five minutes, but the length of time can be modified by passing in time_filter_condition arguments. The available time_filter_condition values can be found in Control Hub’s API documentation:

# Get 'Record Throughput Time Series' metrics from a job for the last 15 minutes
job_time_series_metrics = job.time_series_metrics(metric_type='Record Throughput Time Series', time_filter_condition='LAST_15M')

# Get 'Record Count Time Series' metrics from a job for the last hour, 6 hours, 12 hours, and then 24 hours
job_time_series_metrics = job.time_series_metrics(metric_type='Record Count Time Series', time_filter_condition='LAST_1H')
job_time_series_metrics = job.time_series_metrics(metric_type='Record Count Time Series', time_filter_condition='LAST_6H')
job_time_series_metrics = job.time_series_metrics(metric_type='Record Count Time Series', time_filter_condition='LAST_12H')
job_time_series_metrics = job.time_series_metrics(metric_type='Record Count Time Series', time_filter_condition='LAST_24H')

# Get 'Batch Throughput Time Series' metrics from a job for the last 2 days, 7 days, and then 30 days
job_time_series_metrics = job.time_series_metrics(metric_type='Batch Throughput Time Series', time_filter_condition='LAST_2D')
job_time_series_metrics = job.time_series_metrics(metric_type='Batch Throughput Time Series', time_filter_condition='LAST_7D')
job_time_series_metrics = job.time_series_metrics(metric_type='Batch Throughput Time Series', time_filter_condition='LAST_30D')

Balancing Data Collector instances

Control Hub allows jobs to be balanced across Data Collector instances that are tagged appropriately for the jobs in question. To balance all jobs running on specific Data Collectors, you can use the streamsets.sdk.ControlHub.balance_data_collectors() method after retrieving the specific streamsets.sdk.DataCollector instance(s) that you want to balance:

# Retrieve the Data Collector instances to be balanced - all Data Collector instances, in this example
data_collectors = sch.data_collectors
sch.balance_data_collectors(data_collectors)