Job Instances#


A job defines the pipeline to run and the execution engine that runs the pipeline. The SDK enables you to interact with jobs on the Platform including creating a job, deleting a job, starting a job, stopping a job, retrieving a job’s metrics, and more.

Creating a Job#

To create a new streamsets.sdk.sch_models.Job object and add it to the Platform, use the streamsets.sdk.sch_models.JobBuilder class. Use the streamsets.sdk.ControlHub.get_job_builder() method to instantiate the builder object:

job_builder = sch.get_job_builder()

Next, retrieve the streamsets.sdk.sch_models.Pipeline object that you wish to create to the job for, pass it to the streamsets.sdk.sch_models.JobBuilder.build() method, and pass the resulting job object to the streamsets.sdk.ControlHub.add_job() method:

pipeline = sch.pipelines.get(name='Test Pipeline')
job = job_builder.build('job name', pipeline=pipeline)
sch.add_job(job)

Creating a Job With a Particular Pipeline Version#

To create a job with a specific version of a pipeline, you can pass a specific pipeline commit to the streamsets.sdk.sch_models.JobBuilder.build() method. Simply retrieve the desired streamsets.sdk.sch_models.PipelineCommit instance from the pipeline you’re creating the job for:

job_builder = sch.get_job_builder()
pipeline = sch.pipelines.get(name='Test Pipeline')
pipeline_commit = pipeline.commits.get(version='1')
job = job_builder.build('job name', pipeline=pipeline, pipeline_commit=pipeline_commit)
sch.add_job(job)

Retrieving an Existing Job#

To retrieve existing jobs from a Control Hub instance, you can reference the streamsets.sdk.ControlHub.jobs attribute of your streamsets.sdk.ControlHub instance. You can also further filter and refine the jobs returned based on attributes including job_name, job_status, id, job_tag, and executor_type:

# Returns a list of all Job instances in 'INACTIVE' status
sch.jobs.get_all(job_status='INACTIVE')
# Returns a list of all Job instances that are tagged with 'test'
sch.jobs.get_all(job_tag='test')
# Retrieve a particular job's tag first, and then check for other jobs with the same tag
job = sch.jobs.get(job_name='job name')
tag = job.tags.get(tag='tag name')
sch.jobs.get(job_tag=tag.tag)
# Returns a single Job instance that matches the supplied ID
sch.jobs.get(id='acd9dea6-ffcd-4ae0-86e2-bef38c000304:9e1e3faa-ca28-4c05-9edb-3b18aaba4604')

Tip

As of SDK 5.0.0, do not supply the organization ID in the tag when filtering on job_tag. For example, retrieving jobs tagged with the tag labeled 'test' would equate to job_tag='test' rather than job_tag='test:<organization_id>'. For additional information on tags, please refer to the API documentation for the streamsets.sdk.sch_models.Tag class.

Upgrading a Job#

When a job uses a pipeline that gets updated, it is necessary to upgrade the job to make sure the latest version of the pipeline is being used in the job. To upgrade one or more jobs to the corresponding latest pipeline version, you can use the streamsets.sdk.ControlHub.upgrade_job() method:

# Get all job instances that use the pipeline version tagged with the 'v1' commit label
jobs = sch.jobs.get_all(pipeline_commit_label='v1')
sch.upgrade_job(*jobs)

Updating a Job With a Different Pipeline Version#

A job can also be updated to use an entirely different pipeline or pipeline version. Set the streamsets.sdk.sch_models.Job.commit attribute of the streamsets.sdk.sch_models.Job instance to point to the new pipeline commit, and then pass the updated streamsets.sdk.sch_models.Job instance to the streamsets.sdk.ControlHub.update_job() method:

job = sch.jobs.get(pipeline_commit_label='v2')
pipeline = sch.pipelines.get(name='Test Pipeline')
pipeline_commit = pipeline.commits.get(version='1')
job.commit = pipeline_commit
sch.update_job(job)

Duplicating a Job#

The SDK also allows for explicitly duplicating an existing job on the Platform. Simply retrieve the streamsets.sdk.sch_models.Job instance that you want to duplicate, and pass it to the streamsets.sdk.ControlHub.duplicate_job() method along with the number of copies to create:

job = sch.jobs.get(job_id='6889df89-7aaa-4e10-9f26-bdf16af4c0db:admin')
sch.duplicate_job(job, number_of_copies=2)

Output:

[<Job (job_id=e52c4157-2aec-4b7c-b875-8244d5dc220b:admin, job_name=Job for dev copy1)>,
 <Job (job_id=c0307b6e-2eee-44e3-b8b1-9600e25a30b7:admin, job_name=Job for dev copy2)>]

Importing Jobs#

Jobs can also be imported directly in the SDK. To import one or more jobs from a compressed archive, you can use the streamsets.sdk.ControlHub.import_jobs() method, passing in the compressed archive to the method. This will return a streamsets.sdk.utils.SeekableList of the streamsets.sdk.sch_models.Job objects that were imported:

# Open a compressed archive for reading, and then pass it into the import_jobs method
with open('jobs.zip', 'rb') as jobs_file:
    jobs = sch.import_jobs(archive=jobs_file)

Exporting Jobs#

Similarly, jobs can also be exported from the Platform directly in the SDK. To export one or more jobs to a compressed archive, use the streamsets.sdk.ControlHub.export_jobs() method after retrieving the streamsets.sdk.sch_models.Job object(s) you wish to export:

# Retrieve the Job objects to export - all jobs in the Platform organization, in this example
jobs = sch.jobs
jobs_file_data = sch.export_jobs(jobs)

# Open an archive file for writing, and write out the exported job data
with open('jobs.zip', 'wb') as jobs_file:
    jobs_file.write(jobs_file_data)

Resetting offsets#

Jobs maintain offsets to keep track of the most-recently processed data before the job was stopped. It is sometimes desirable, or necessary, to reset the offset of a particular job. To reset offsets for one or more jobs, use the streamsets.sdk.ControlHub.reset_origin() method after retrieving the streamsets.sdk.sch_models.Job instance(s) you wish to reset:

# Get all jobs available from the Platform organization, then reset each of their origins
jobs = sch.jobs
sch.reset_origin(*jobs)

Retrieving Offsets#

A job’s current offsets can also be retrieved via the SDK. To retrieve the current offsets of a job, reference the streamsets.sdk.sch_models.JobStatus.offsets attribute of the job’s streamsets.sdk.ControlHub.Job.current_status. This will return a streamsets.sdk.sch_models.JobOffset object

job = sch.jobs.get(name='job name')
job.current_status.offsets

Output:

[<JobOffset (sdc_id=0501dc93-8634-11e9-99f3-97919257db3c, pipeline_id=896197a7-9639-4575-9784-260f1dc46fbc:admin)>]

To retrieve offsets from a particular job run, you can reference the streamsets.sdk.sch_models.JobStatus.offsets attribute of a job’s streamsets.sdk.ControlHub.Job.history object:

# Get the most recent run (JobStatus object) from the job's history
job_status = job.history[0]
job_status.offsets

Output:

[<JobOffset (sdc_id=0501dc93-8634-11e9-99f3-97919257db3c, pipeline_id=896197a7-9639-4575-9784-260f1dc46fbc:admin)>]

Uploading Offsets#

It’s also possible to upload a job’s offset. For example, if you create a second job that reads the same origin data as an existing job, but you want to ensure both to start with the latest offset, you could upload the offset to the newly-created job. To upload offsets for a job use the streamsets.sdk.ControlHub.upload_offset() method:

job = sch.jobs.get(name='job name')

with open('offset.json') as offset_file:
    sch.upload_offset(job, offset_file=offset_file)

The streamsets.sdk.ControlHub.upload_offset() method can also be used to upload an offset in raw JSON format:

offset_json = {"version" : 2,
               "offsets" : {"$com.streamsets.datacollector.pollsource.offset$" : None}}
sch.upload_offset(job, offset_json=offset_json)

Retrieving Job Status History#

Retrieving a given job’s history can also be done from the SDK. Simply retrieve the streamsets.sdk.sch_models.Job instance in question from the Platform and reference its streamsets.sdk.sch_models.Job.history attribute. This will show the execution history for the job all contained within a streamsets.sdk.sch_models.JobStatus object:

job = sch.jobs[0]
job.history

Output:

[<JobStatus (status=INACTIVE, start_time=1585923912290, finish_time=1585923935759, run_count=2)>,
 <JobStatus (status=INACTIVE, start_time=1585923875846, finish_time=1585923897766, run_count=1)>]

Retrieving Run Events from Job History#

You can introspect on an individual streamsets.sdk.sch_models.JobStatus object within a job to see the run events for it. The run events correspond to the events that occurred during that execution, like the job activating or deactivating:

# Get the most recent run (JobStatus object) from the job's history
job_status = job.history[0]
job_status.run_history

Output:

[<JobRunEvent (user=admin@admin, time=1560367534056, status=ACTIVATING)>,
 <JobRunEvent (user=admin@admin, time=1560367540929, status=DEACTIVATING)>,
 <JobRunEvent (user=None, time=1560367537771, status=DEACTIVATING)>,
 <JobRunEvent (user=None, time=1560367537814, status=DEACTIVATING)>]

Metrics#

To access metrics for a job, reference the streamsets.sdk.sch_models.Job.metrics attribute of a streamsets.sdk.sch_models.Job instance. This will return a streamsets.sdk.utils.SeekableList of streamsets.sdk.sch_models.JobMetrics objects that are in reverse chronological order (newest first):

job = sch.jobs.get(job_name='job name')
job.metrics

Output:

[<JobMetrics (run_count=5, input_count=3204, output_count=3204, total_error_count=0)>,
 <JobMetrics (run_count=4, input_count=24740, output_count=24740, total_error_count=0)>,
 <JobMetrics (run_count=3, input_count=9960, output_count=9960, total_error_count=0)>,
 <JobMetrics (run_count=2, input_count=9564, output_count=9564, total_error_count=0)>,
 <JobMetrics (run_count=1, input_count=792, output_count=792, total_error_count=0)>]

We can also reference the streamsets.sdk.sch_models.Job.history attribute of a streamsets.sdk.sch_models.Job instance to figure out which job run we might be interested in. For example, if we wanted to know which job run executed at Apr 01 2021 16:39:48 GMT (unix-timestamp ‘1617295188217’) and get the metrics for it, we could use the following steps:

job.history.get(start_time=1617295188217)

Output:

<JobStatus (status=INACTIVE, start_time=1617295188217, finish_time=1617295209406, run_count=2)>

This was run_count 2, so now we know which run_count to reference for this run’s metrics

job.metrics.get(run_count=2)

Output:

<JobMetrics (run_count=2, input_count=9564, output_count=9564, total_error_count=0)>

Balancing Data Collector instances#

The Platform allows jobs to be balanced across Data Collector instances that are tagged appropriately for the jobs in question. To balance all jobs running on specific Data Collectors, you can use the streamsets.sdk.ControlHub.balance_data_collectors() method after retrieving the specific streamsets.sdk.DataCollector instance(s) that you want to balance:

# Retrieve the Data Collector instances to be balanced - all Data Collector instances, in this example
data_collectors = sch.data_collectors
sch.balance_data_collectors(data_collectors)