Pipeline Fragments
Section Contents
Pipeline Fragments#
A pipeline fragment is a stage or set of connected stages that are frequently used in pipelines. Fragments exist to quickly and easily add the same logic to multiple pipelines, while centralizing the configuration and design within a single object.
Fragments are directly accessible via the SDK, including creating new fragments, managing existing fragments, and adding fragments to a pipeline.
Creating a fragment#
Creating a new fragment instance is almost identical to creating a pipeline - fragments themselves are
actually still streamsets.sdk.sch_models.Pipeline
objects. The only difference is that we specify
fragment=True
when initializing the streamsets.sdk.sch_models.PipelineBuilder
object thus signifying
this object is a pipeline fragment rather than a full pipeline. Use the streamsets.sdk.ControlHub.get_pipeline_builder()
method to instantiate the builder object:
# Initialize fragment builder
pipeline_builder = sch.get_pipeline_builder(fragment=True)
# Add stages to the pipeline builder
dev_data_generator = pipeline_builder.add_stage('Dev Data Generator')
expression_evaluator = pipeline_builder.add_stage('Expression Evaluator')
field_renamer = pipeline_builder.add_stage('Field Renamer')
# Connect the stages
dev_data_generator >> [expression_evaluator, field_renamer]
# Build and publish the pipeline fragment
fragment = pipeline_builder.build('Test Fragment')
sch.publish_pipeline(fragment)
Retrieving a Pipeline Fragment#
Retrieving pipeline fragments is very similar to the steps for retrieving pipelines.
Because the streamsets.sdk.ControlHub.pipelines
attribute returns a streamsets.sdk.utils.SeekableList
of streamsets.sdk.sch_models.Pipeline
objects, you can filter the list by providing fragment=True
when calling streamsets.sdk.utils.SeekableList.get()
or streamsets.sdk.utils.SeekableList.get_all()
:
sch.pipelines.get_all(fragment=True)
Output:
[<Pipeline (pipeline_id=88d58863-7e8b-4831-a929-8c56db629483:admin,
commit_id=600a7709-6a13-4e9b-b4cf-6780f057680a:admin,
name=Dev as fragment,
version=1)>,
<Pipeline (pipeline_id=5b67c7dc-729b-43cc-bee7-072d3feb184b:admin,
commit_id=491cf010-da8c-4e63-9918-3f5ef3b182f6:admin,
name=Test Fragment,
version=1)>]
Alternatively, you can retrieve a specific pipeline fragment the same way you would any other pipeline: by specifying
pipeline_id
, name
, or commit_id
to filter the pipeline results:
pipeline_fragment = sch.pipelines.get(name='Test fragment', fragment=True)
pipeline_fragment
pipeline_fragment.fragment
Output:
# pipeline_fragment
<Pipeline (pipeline_id=5b67c7dc-729b-43cc-bee7-072d3feb184b:admin, commit_id=491cf010-da8c-4e63-9918-3f5ef3b182f6:admin, name=Test Fragment, version=1)>
# pipeline_fragment.fragment
True
Using a fragment in a pipeline#
Adding a fragment to a pipeline is almost identical to adding a stage to a pipeline builder. Once you’ve
retrieved the fragment object you wish to add to the pipeline, simply add it to the streamsets.sdk.sch_models.PipelineBuilder
instance via the streamsets.sdk.sch_models.PipelineBuilder.add_fragment()
method (inherited from
streamsets.sdk.sdc_models.PipelineBuilder.add_fragment()
), it can be treated like any other stage within the
pipeline builder:
pipeline_builder = sch.get_pipeline_builder()
# Retrieve the fragment object to add to the pipeline
fragment = sch.pipelines.get(fragment=True, name='Test Fragment')
# Add the fragment to the pipeline builder, which returns a Stage object
fragment_stage = pipeline_builder.add_fragment(fragment)
# Add other stages to the pipeline using add_stage
trash1 = pipeline_builder.add_stage('Trash')
trash2 = pipeline_builder.add_stage('Trash')
# Connect the fragment to the other stages
fragment_stage >> trash1
fragment_stage >> trash2
# Build and publish the pipeline
pipeline = pipeline_builder.build('Test Pipeline')
sch.publish_pipeline(pipeline)
Retrieving Pipelines that use a specific Pipeline Fragment#
To retrieve all the pipelines that use a specific fragment, you can pass using_fragment=<fragment>
when
calling streamsets.sdk.utils.SeekableList.get()
or streamsets.sdk.utils.SeekableList.get_all()
-
similar to what is done when retrieving pipeline fragments. The using_fragment
parameter expects a
streamsets.sdk.sch_models.Pipeline
object on which to filter the results:
# Retrieve the fragment object to be used for the lookup
fragment = sch.pipelines.get(fragment=True, name='Test Fragment')
# Retrieve all pipelines from Control Hub that use the fragment retrieved above
sch.pipelines.get_all(using_fragment=fragment)
Output:
[<Pipeline (pipeline_id=0e1a42c9-7ce3-4295-84dd-ff53a7b313c3:admin,
commit_id=f3479d83-6e52-4f85-824c-e8ef4185d8f6:admin,
name=Test Pipeline,
version=1)>]
Updating an existing pipeline with new fragment version#
When a fragment is updated and a new version is committed, the pipelines that use that fragment need to be updated to
use the latest version.
To update pipelines that use a specific fragment with the new version of that fragment, you can use the
streamsets.sdk.ControlHub.update_pipelines_with_different_fragment_version()
method. This method expects a
list of streamsets.sdk.sch_models.Pipeline
objects to be updated, as well as two
streamsets.sdk.sch_models.PipelineCommit
objects that represent the fragment version to upgrade from and the
fragment version to upgrade to:
# Get the fragment object that was updated
fragment = sch.pipelines.get(fragment=True, name='Test Fragment')
# Get the old fragment version to upgrade from, and the new fragment version to upgrade to
from_fragment_version = fragment.commits.get(version='1')
to_fragment_version = fragment.commits.get(version='2')
# Get a SeekableList of all pipelines that are currently using the fragment in question, and then pass the list
# to the update_pipelines_with_different_fragment_version() method
pipelines = sch.pipelines.get_all(using_fragment=fragment)
sch.update_pipelines_with_different_fragment_version(pipelines=pipelines,
from_fragment_version=from_fragment_version,
to_fragment_version=to_fragment_version)