Editing Pipelines
Section Contents
Editing Pipelines#
Editing Pipelines in the Platform SDK follows the structure and conventions that you’re already familiar with in the UI, while offering an extensible, programmatic interaction with pipeline objects.
For more details on Pipeline interaction and usage in the UI, refer to the StreamSets Platform Documentation for pipelines.
Hint
All of the examples below have focused on stages for SDC pipelines, however streamsets.sdk.sch_models.SchStStage
instances could be swapped into these examples for Transformer pipelines without issue.
Retrieving An Existing Pipeline#
In the Pipeline UI, you can see your existing Pipelines, and click into them as necessary, seen below
The streamsets.sdk.ControlHub.pipelines
attribute can be used to retrieve all your Pipelines.
This attribute returns a streamsets.sdk.utils.SeekableList
of streamsets.sdk.sch_models.Pipeline
objects:
sch.pipelines
Alternatively, you can retrieve specific pipelines by specifying pipeline_id
, name
, version
, or commit_id
to filter the pipeline results
when calling the streamsets.sdk.utils.SeekableList.get()
or streamsets.sdk.utils.SeekableList.get_all()
methods:
pipeline = sch.pipelines.get(name='Test Pipeline')
all_version_1_pipelines = sch.pipelines.get_all(version='1')
pipeline
all_version_1_pipelines
Output:
# pipeline
<Pipeline (pipeline_id=5b67c7dc-729b-43cc-bee7-072d3feb184b:admin, commit_id=491cf010-da8c-4e63-9918-3f5ef3b182f6:admin, name=Test Pipeline, version=1)>
# all_version_1_pipelines
[<Pipeline (pipeline_id=88d58863-7e8b-4831-a929-8c56db629483:admin,
commit_id=600a7709-6a13-4e9b-b4cf-6780f057680a:admin,
name=Test Pipeline,
version=1)>,
<Pipeline (pipeline_id=5b67c7dc-729b-43cc-bee7-072d3feb184b:admin,
commit_id=491cf010-da8c-4e63-9918-3f5ef3b182f6:admin,
name=Test Pipeline 2,
version=1)>]
Adding Stages To An Existing Pipeline#
Once the pipeline is created, you can add stages to it using the Pipeline Canvas UI, seen below:
To add stages to an existing pipeline using the SDK, utilize the streamsets.sdk.sch_models.Pipeline.add_stage()
method - see the API reference for this method for details on the arguments this method accepts.
As shown in the image above, the simplest type of pipeline directs one origin into one destination.
To recreate the example above via the SDK, you would use the Dev Raw Data Source
origin and Trash
destination, respectively:
dev_raw_data_source = pipeline.add_stage('Dev Raw Data Source')
trash = pipeline.add_stage('Trash')
Note
Dev Raw Data Source
origin cannot be used in Transformer for Snowflake pipelines.
Instead, use Snowflake Table
or Snowflake Query
Once the desired stages have been added to the pipeline, you can connect them to the other stages in the pipeline as detailed in the Connecting the Stages section.
Retrieving Existing Stages In a Pipeline#
When working with an existing streamsets.sdk.sch_models.Pipeline
instance that you want to update, the first step will be retrieving the stage instances to be modified.
To retrieve the streamsets.sdk.sch_models.SchSdcStage
instances you want to update, utilize the stages
attribute for a pipeline.
This will return a streamsets.sdk.utils.SeekableList
of stages that can filtered on specific attributes like label
, instance_name
, stage_type
, stage_name
or any of the other various attributes.
Keeping with the example from the screenshot in the above section, you could execute any of the following commands to retrieve the stages in the pipeline:
# Retrieve the Dev Raw Data Source origin in various ways
dev_raw_data_source = pipeline.stages.get(label='Dev Raw Data Source 1')
dev_raw_data_source = pipeline.stages.get(instance_name='DevRawDataSource_1')
dev_raw_data_source = pipeline.stages.get(stage_type='SOURCE')
dev_raw_data_source = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_devtest_rawdata_RawDataDSource')
# Retrieve the Trash destination in various ways
trash = pipeline.stages.get(label='Trash 1')
trash = pipeline.stages.get(instance_name='Trash_1')
trash = pipeline.stages.get(stage_type='TARGET')
trash = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_destination_devnull_NullDTarget')
If you need to retrieve all stages from a pipeline that match a certain criteria, use the streamsets.sdk.utils.SeekableList.get_all()
method:
# Retrieve ALL destination stages, for example
destination_stages_list = pipeline.stages.get_all(stage_type='TARGET')
Copy the Lanes of a Stage#
When working with an existing streamsets.sdk.sch_models.SchSdcStage
instance, you can copy it’s input and output lanes to another streamsets.sdk.sch_models.SchSdcStage
instance within the same Pipeline.
To copy input lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage
object that you wish to copy into the streamsets.sdk.sch_models.SchSdcStage.copy_inputs()
method.
In order to override all the input lanes of the current stage with the input lanes of the passed in streamsets.sdk.sch_models.SchSdcStage
object, simply set override
to True
within the streamsets.sdk.sch_models.SchSdcStage.copy_inputs()
method:
# copy input lanes of dev_identity into data_parser
data_parser.copy_inputs(dev_identity)
# override data_parser's input lanes with that of dev_identity
data_parser.copy_inputs(dev_identity, override=True)
To copy output lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage
object that you wish to copy into the streamsets.sdk.sch_models.SchSdcStage.copy_outputs()
method.
# copy output lanes of dev_identity into data_parser
data_parser.copy_outputs(dev_identity)
Note
The streamsets.sdk.sch_models.SchSdcStage.copy_inputs()
and streamsets.sdk.sch_models.SchSdcStage.copy_outputs()
methods only work for stages within the same streamsets.sdk.sch_models.Pipeline
or streamsets.sdk.sch_models.PipelineBuilder
instance.
Disconnecting Stages In a Pipeline#
To disconnect stages on the Pipeline Canvas in the UI, click on the stage’s connection and click the Trash icon on the pop-up that appears, shown below:
To disconnect output lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage
object to disconnect into the streamsets.sdk.sch_models.SchSdcStage.disconnect_output_lanes()
method.
In order to disconnect all stages receiving output from a specific stage, simply set all_stages
to True
within the streamsets.sdk.sch_models.SchSdcStage.disconnect_output_lanes()
method:
# disconnect dev_raw_data_source from trash
dev_raw_data_source.disconnect_output_lanes(stages=[trash])
# disconnect all stages receiving output from the dev_raw_data_source stage
dev_raw_data_source.disconnect_output_lanes(all_stages=True)
To disconnect input lanes in the SDK, simply pass in the streamsets.sdk.sch_models.SchSdcStage
object to disconnect into the streamsets.sdk.sch_models.SchSdcStage.disconnect_input_lanes()
method.
In order to disconnect a specific stage from all other stages it receives input from, simply set all_stages
to True
within the streamsets.sdk.sch_models.SchSdcStage.disconnect_input_lanes()
method:
# disconnect trash from dev_raw_data_source
trash.disconnect_input_lanes(stages=[dev_raw_data_source])
# disconnect trash from all other stages it receives input from
trash.disconnect_input_lanes(all_stages=True)
Note
It is not necessary to call both streamsets.sdk.sch_models.SchSdcStage.disconnect_output_lanes()
and streamsets.sdk.sch_models.SchSdcStage.disconnect_input_lanes()
to break the connection between two stages.
Calling just one of these methods will disconnect the stages from one another.
Removing Stages From An Existing Pipeline#
Once a stage has been added, you can remove that stage using the Pipeline Canvas UI, seen below:
To remove stages from an existing pipeline using the SDK, utilize the streamsets.sdk.sch_models.Pipeline.remove_stages()
method - see the API reference for this method for details on the arguments this method accepts.
To use the SDK to delete the stage as shown in the example above, you would delete the Trash
destination as seen below:
pipeline.remove_stage(trash)
Note
Removing a stage from an existing streamsets.sdk.sch_models.Pipeline
instance also removes all output & input lane references that any connected stages had to this stage.
Editing Pipeline/Stage Configuration Values#
Once a stage has been added, you can edit it’s configuration values in the Pipeline Canvas like so:
To edit configuration values in the SDK, you can access the configuration
property in the streamsets.sdk.sch_models.Pipeline
or streamsets.sdk.sch_models.SchSdcStage
object
For example, if you wanted to check the configuration
value of the dev_raw_data_source
stage, you could do the following:
dev_raw_data_source.configuration.stop_after_first_batch
Output:
False
Setting the configuration value is as simple as directly setting the value in-memory:
dev_raw_data_source.configuration.stop_after_first_batch = True
Note
The same workflow can be followed to access/edit configuration values of streamsets.sdk.sch_models.Pipeline
objects
Once you have edited your streamsets.sdk.sch_models.Pipeline
or streamsets.sdk.sch_models.SchSdcStage
, the changes must be published to Control Hub.
This can be done by taking the updated streamsets.sdk.sch_models.Pipeline
instance and passing it into the streamsets.sdk.sch.publish_pipeline()
method as seen below:
sch.publish_pipeline(pipeline, commit_message='My Edited Pipeline')
Bringing It All Together#
The complete scripts from this section can be found below. Commands that only served to verify some output from the example have been removed.
from streamsets.sdk import ControlHub
sch = ControlHub(credential_id='<credential_id>', token='<token>')
#all_pipelines = sch.pipelines
#all_version_1_pipelines = sch.pipelines.get_all(version='1')
pipeline = sch.pipelines.get(name='Test Pipeline')
dev_raw_data_source = pipeline.add_stage('Dev Raw Data Source')
trash = pipeline.add_stage('Trash')
# Retrieve the Dev Raw Data Source origin in various ways
dev_raw_data_source = pipeline.stages.get(label='Dev Raw Data Source 1')
#dev_raw_data_source = pipeline.stages.get(instance_name='DevRawDataSource_1')
#dev_raw_data_source = pipeline.stages.get(stage_type='SOURCE')
#dev_raw_data_source = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_devtest_rawdata_RawDataDSource')
# Retrieve the Trash destination in various ways
trash = pipeline.stages.get(label='Trash 1')
#trash = pipeline.stages.get(instance_name='Trash_1')
#trash = pipeline.stages.get(stage_type='TARGET')
#trash = pipeline.stages.get(stage_name='com_streamsets_pipeline_stage_destination_devnull_NullDTarget')
# Retrieve ALL destination stages
destination_stages_list = pipeline.stages.get_all(stage_type='TARGET')
# Remove trash from the Pipeline
#pipeline.remove_stages(trash)
dev_raw_data_source.configuration.stop_after_first_batch = True
sch.publish_pipeline(pipeline, commit_message='My Edited Pipeline')