Creating Pipelines#


Pipeline creation and management in Control Hub is extremely similar to the syntax and structure used in DataCollector.

Instantiating a Pipeline Builder#

The first step of creating a pipeline on Control Hub is to instantiate a streamsets.sdk.sch_models.PipelineBuilder instance. This class handles the majority of the pipeline configuration on your behalf by building the initial JSON representation of the pipeline, and setting default values for essential properties (instead of requiring each to be set manually). Use the streamsets.sdk.ControlHub.get_pipeline_builder() method to instantiate the builder object:

pipeline_builder = sch.get_pipeline_builder()

If you wish to specify a particular streamsets.sdk.DataCollector instance as the Authoring Data Collector for the pipeline, it can be passed into the builder’s instantiation:

sdc = sch.data_collectors.get(url='<data_collector_address>')
pipeline_builder = sch.get_pipeline_builder(data_collector=sdc)

Adding Stages to the Pipeline Builder#

Adding stages to the pipeline can be done by calling the streamsets.sdk.sch_models.PipelineBuilder.add_stage() method - see the API reference for this method for details on the arguments this method takes.

As shown in the first example, the simplest type of pipeline directs one origin into one destination. For this example, you can do this with Dev Raw Data Source origin and Trash destination, respectively:

dev_raw_data_source = pipeline_builder.add_stage('Dev Raw Data Source')
trash = pipeline_builder.add_stage('Trash')

Connecting the Stages#

With streamsets.sdk.sch_models.SchSdcStage instances in hand, you can connect them by using the >> operator. Connecting the Dev Raw Data Source origin and Trash destination from the example above would look like the following:

dev_raw_data_source >> trash

Output:

<com_streamsets_pipeline_stage_destination_devnull_NullDTarget (instance_name=Trash_01)>

You can also connect a stage’s event stream to another stage, like a pipeline finisher, using a similar convention. To connect a stage’s event stream to another stage, use the >= operator:

pipeline_finisher = pipeline_builder.add_stage('Pipeline Finisher Executor')
dev_raw_data_source >= pipeline_finisher

Output:

True

Once the stages are connected, you can build the streamsets.sdk.sch_models.Pipeline instance with the streamsets.sdk.sch_models.PipelineBuilder.build() method:

pipeline = pipeline_builder.build('My first pipeline')
pipeline

Output:

<Pipeline (pipeline_id=None, commit_id=None, name=My first pipeline, version=None)>

Add the Pipeline to Control Hub#

Finally, to add this pipeline to your Control Hub organization, pass it to the streamsets.sdk.ControlHub.publish_pipeline() method:

sch.publish_pipeline(pipeline, commit_message='First commit of my first pipeline')

Output:

<streamsets.sdk.sch_api.Command object at 0x7f8f2e0579b0>