Editing Pipelines#


Editing Pipelines in the Platform SDK follows the structure and conventions that you’re already familiar with in the UI, while offering an extensible, programmatic interaction with pipeline objects.

For more details on Pipeline interaction and usage in the UI, refer to the StreamSets Platform Documentation for pipelines.

Hint

All of the examples below have focused on stages for SDC pipelines, however streamsets.sdk.sch_models.SchStStage instances could be swapped into these examples for Transformer pipelines without issue.

Retrieving An Existing Pipeline#

In the Pipeline UI, you can see your existing Pipelines, and click into them as necessary, seen below

../_images/existing_pipelines.png


The streamsets.sdk.ControlHub.pipelines attribute can be used to retrieve all your Pipelines. This attribute returns a streamsets.sdk.utils.SeekableList of streamsets.sdk.sch_models.Pipeline objects:

sch.pipelines

Alternatively, you can retrieve specific pipelines by specifying pipeline_id, name, version, or commit_id to filter the pipeline results when calling the streamsets.sdk.utils.SeekableList.get() or streamsets.sdk.utils.SeekableList.get_all() methods:

pipeline = sch.pipelines.get(name='Test Pipeline')
all_version_1_pipelines = sch.pipelines.get_all(version='1')

pipeline
all_version_1_pipelines

Output:

# pipeline
<Pipeline (pipeline_id=5b67c7dc-729b-43cc-bee7-072d3feb184b:admin, commit_id=491cf010-da8c-4e63-9918-3f5ef3b182f6:admin, name=Test Pipeline, version=1)>

# all_version_1_pipelines
[<Pipeline (pipeline_id=88d58863-7e8b-4831-a929-8c56db629483:admin,
            commit_id=600a7709-6a13-4e9b-b4cf-6780f057680a:admin,
            name=Test Pipeline,
            version=1)>,
 <Pipeline (pipeline_id=5b67c7dc-729b-43cc-bee7-072d3feb184b:admin,
            commit_id=491cf010-da8c-4e63-9918-3f5ef3b182f6:admin,
            name=Test Pipeline 2,
            version=1)>]

Adding Stages To An Existing Pipeline#

Once the pipeline is created, you can add stages to it using the Pipeline Canvas UI, seen below:

../_images/stages_unconnected.png


To add stages to an existing pipeline using the SDK, utilize the streamsets.sdk.sch_models.Pipeline.add_stage() method - see the API reference for this method for details on the arguments this method accepts.

As shown in the image above, the simplest type of pipeline directs one origin into one destination. To recreate the example above via the SDK, you would use the Dev Raw Data Source origin and Trash destination, respectively:

dev_raw_data_source = pipeline.add_stage('Dev Raw Data Source')
trash = pipeline.add_stage('Trash')

Note

Dev Raw Data Source origin cannot be used in Transformer for Snowflake pipelines. Instead, use Snowflake Table or Snowflake Query

Once the desired stages have been added to the pipeline, you can connect them to the other stages in the pipeline as detailed in the Connecting the Stages section.

Retrieving Existing Stages In a Pipeline#

When working with an existing streamsets.sdk.sch_models.Pipeline instance that you want to update, the first step will be retrieving the stage instances to be modified. To retrieve the streamsets.sdk.sch_models.SchSdcStage instances you want to update, utilize the stages attribute for a pipeline. This will return a streamsets.sdk.utils.SeekableList of stages that can filtered on specific attributes like label, instance_name, stage_type, stage_name or any of the other various attributes.

Keeping with the example from the screenshot in the above section, you could execute any of the following commands to retrieve the stages in the pipeline:

# Retrieve the Dev Raw Data Source origin in various ways
dev_raw_data_source = pipeline.stages.get(label='Dev Raw Data Source 1')
dev_raw_data_source = pipeline.stages.get(instance_name='DevRawDataSource_1')
dev_raw_data_source = pipeline.stages.get(stage_type='SOURCE')
dev_raw_data_source = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_devtest_rawdata_RawDataDSource')

# Retrieve the Trash destination in various ways
trash = pipeline.stages.get(label='Trash 1')
trash = pipeline.stages.get(instance_name='Trash_1')
trash = pipeline.stages.get(stage_type='TARGET')
trash = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_destination_devnull_NullDTarget')

If you need to retrieve all stages from a pipeline that match a certain criteria, use the streamsets.sdk.utils.SeekableList.get_all() method:

# Retrieve ALL destination stages, for example
destination_stages_list = pipeline.stages.get_all(stage_type='TARGET')

Copy the Lanes of a Stage#

When working with an existing streamsets.sdk.sch_models.SchSdcStage instance, you can copy it’s input and output lanes to another streamsets.sdk.sch_models.SchSdcStage instance within the same Pipeline.

To copy input lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage object that you wish to copy into the streamsets.sdk.sch_models.SchSdcStage.copy_inputs() method. In order to override all the input lanes of the current stage with the input lanes of the passed in streamsets.sdk.sch_models.SchSdcStage object, simply set override to True within the streamsets.sdk.sch_models.SchSdcStage.copy_inputs() method:

# copy input lanes of dev_identity into data_parser
data_parser.copy_inputs(dev_identity)
# override data_parser's input lanes with that of dev_identity
data_parser.copy_inputs(dev_identity, override=True)

To copy output lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage object that you wish to copy into the streamsets.sdk.sch_models.SchSdcStage.copy_outputs() method.

# copy output lanes of dev_identity into data_parser
data_parser.copy_outputs(dev_identity)

Disconnecting Stages In a Pipeline#

To disconnect stages on the Pipeline Canvas in the UI, click on the stage’s connection and click the Trash icon on the pop-up that appears, shown below:

../_images/delete_connection.png


To disconnect output lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage object to disconnect into the streamsets.sdk.sch_models.SchSdcStage.disconnect_output_lanes() method. In order to disconnect all stages receiving output from a specific stage, simply set all_stages to True within the streamsets.sdk.sch_models.SchSdcStage.disconnect_output_lanes() method:

# disconnect dev_raw_data_source from trash
dev_raw_data_source.disconnect_output_lanes(stages=[trash])
# disconnect all stages receiving output from the dev_raw_data_source stage
dev_raw_data_source.disconnect_output_lanes(all_stages=True)

To disconnect input lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage object to disconnect into the streamsets.sdk.sch_models.SchSdcStage.disconnect_input_lanes() method. In order to disconnect a specific stage from all other stages it receives input from, simply set all_stages to True within the streamsets.sdk.sch_models.SchSdcStage.disconnect_input_lanes() method:

# disconnect trash from dev_raw_data_source
trash.disconnect_input_lanes(stages=[dev_raw_data_source])
# disconnect trash from all other stages it receives input from
trash.disconnect_input_lanes(all_stages=True)

Note

It is not necessary to call both streamsets.sdk.sch_models.SchSdcStage.disconnect_output_lanes() and streamsets.sdk.sch_models.SchSdcStage.disconnect_input_lanes() to break the connection between two stages. Calling just one of these methods will disconnect the stages from one another.

Removing Stages From An Existing Pipeline#

Once a stage has been added, you can remove that stage using the Pipeline Canvas UI, seen below:

../_images/remove_stage.png


To remove stages from an existing pipeline using the SDK, utilize the streamsets.sdk.sch_models.Pipeline.remove_stages() method - see the API reference for this method for details on the arguments this method accepts.

To use the SDK to delete the stage as shown in the example above, you would delete the Trash destination as seen below:

pipeline.remove_stage(trash)

Note

Removing a stage from an existing streamsets.sdk.sch_models.Pipeline instance also removes all output & input lane references that any connected stages had to this stage.

Editing Pipeline/Stage Configuration Values#

Once a stage has been added, you can edit it’s configuration values in the Pipeline Canvas like so:

../_images/edit_configuration.png


To edit configuration values in the SDK, you can access the configuration property in the streamsets.sdk.sch_models.Pipeline or streamsets.sdk.sch_models.SchSdcStage object

For example, if you wanted to check the configuration value of the dev_raw_data_source stage, you could do the following:

dev_raw_data_source.configuration.stop_after_first_batch

Output:

False

Setting the configuration value is as simple as directly setting the value in-memory:

dev_raw_data_source.configuration.stop_after_first_batch = True

Note

The same workflow can be followed to access/edit configuration values of streamsets.sdk.sch_models.Pipeline objects

Once you have edited your streamsets.sdk.sch_models.Pipeline or streamsets.sdk.sch_models.SchSdcStage, the changes must be published to Control Hub. This can be done by taking the updated streamsets.sdk.sch_models.Pipeline instance and passing it into the streamsets.sdk.sch.publish_pipeline() method as seen below:

sch.publish_pipeline(pipeline, commit_message='My Edited Pipeline')

Bringing It All Together#

The complete scripts from this section can be found below. Commands that only served to verify some output from the example have been removed.

from streamsets.sdk import ControlHub

sch = ControlHub(credential_id='<credential_id>', token='<token>')

#all_pipelines = sch.pipelines
#all_version_1_pipelines = sch.pipelines.get_all(version='1')
pipeline = sch.pipelines.get(name='Test Pipeline')

dev_raw_data_source = pipeline.add_stage('Dev Raw Data Source')
trash = pipeline.add_stage('Trash')

# Retrieve the Dev Raw Data Source origin in various ways
dev_raw_data_source = pipeline.stages.get(label='Dev Raw Data Source 1')
#dev_raw_data_source = pipeline.stages.get(instance_name='DevRawDataSource_1')
#dev_raw_data_source = pipeline.stages.get(stage_type='SOURCE')
#dev_raw_data_source = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_devtest_rawdata_RawDataDSource')

# Retrieve the Trash destination in various ways
trash = pipeline.stages.get(label='Trash 1')
#trash = pipeline.stages.get(instance_name='Trash_1')
#trash = pipeline.stages.get(stage_type='TARGET')
#trash = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_destination_devnull_NullDTarget')

# Retrieve ALL destination stages
destination_stages_list = pipeline.stages.get_all(stage_type='TARGET')

# Remove trash from the Pipeline
#pipeline.remove_stages(trash)

dev_raw_data_source.configuration.stop_after_first_batch = True

sch.publish_pipeline(pipeline, commit_message='My Edited Pipeline')