Handling Error Records#

Handling Error Records in the Platform SDK is very similar to handling error records in the UI.

For more information, you can view the Streamsets Documentation for Error Handling.

Hint

All of the examples below have focused on stages for SDC pipelines, however streamsets.sdk.sch_models.SchStStage instances could be swapped into these examples for Transformer pipelines without issue.

Handling Error Records in a Pipeline#

In the UI, you can edit how error records are handled by clicking on Show Advanced Options and then Error Records.

../_images/error_records_main.png

The default action for when a pipeline produces error records is to discard the record. This behavior can be modified, like writing the error records to file or sending it to a MQTT broker.

To do so in the UI, click on the options for Error Records and select the appropriate option.

You can do this in the Platform SDK by using streamsets.sdk.sch_models.PipelineBuilder.add_error_stage() as follows:

error_stage = pipeline_builder.add_error_stage('Write to File')

Note

Having different stage libraries in your engine’s deployment configuration may show additional options than what are available by default.. For example: using the AWS library for your deployment will give an option to write your error records to S3 instead.

Configuring Error Record Handling for a Pipeline#

In the UI, choosing an option other than Discard will provide additional configuration options under the Error Records tab, as seen below:

../_images/error_records_configuration.png

You can do this in the Platform SDK by accessing the configuration property of streamsets.sdk.sch_models.SchSdcStage instance that represents your error stage for the pipeline:

error_stage.configuration.directory = '/path/to/error/directory'

Hint

When writing to a directory you must first create the directory in the deployment, documentation for it can be found at: Streamsets Documentation to manage a Local FS.

Changing the Error Record Policy for a Pipeline#

In the UI, you can change the Error Record Policy from the main Error Records Tab

../_images/error_records_main.png

The default value for Error Records Policy in the UI is Original record as it was generated by the origin but can be changed to Record as it was seen by the stage that sent it to error stream.

Hint

The equivalent values you can supply in the Platform SDK for Error Record Policy are ORIGINAL_RECORD and STAGE_RECORD.

To do this in the Platform SDK you can do the following:

# store the record as seen by the first stage in the pipeline
pipeline.error_record_policy = 'ORIGINAL_RECORD'

# store the record as seen by the stage that raised the error
pipeline.error_record_policy = 'STAGE_RECORD'

Bringing It All Together#

The complete scripts from this section can be found below. Commands that only served to verify some output from the example have been removed.

# creating a pipeline builder and adding stages
pipeline_builder = sch.get_pipeline_builder(...)
stage_1 = pipeline_builder.add_stage(...)

# adding an appropriate error stage for our use case, can leave it as is for the error records to be discarded
error_stage = pipeline_builder.add_error_stage('Write to File')

# remember to configure the error stage appropriately
error_stage.configuration.directory = '/path/to/error/directory'

# build the pipeline
pipeline = pipeline_builder.build('Pipeline name')

# you can, optionally, change the error records policy
pipeline.configuration.error_record_policy = 'STAGE_RECORD'

# now, the pipeline is ready to be published
sch.publish_pipeline(pipeline)