Basic Tutorial
The basic tutorial creates a pipeline that reads a file from an HTTP resource URL, processes the data in two branches, and writes all data to a file system. You'll use data preview to help configure the pipeline, then create a data alert and run the pipeline.
- Configure pipeline properties, primarily error handling.
- Add an HTTP Client origin to represent the data to be processed.
- Preview source data to determine the field-level details needed for the pipeline.
- Use a Stream Selector processor to route credit card transactions to the primary branch and cash transactions to the secondary branch. We'll define a required field to discard records without a payment type.
- Configure a Jython Evaluator processor to perform custom processing that determines the credit card type based on the credit card number.
- Add a Field Masker processor to mask credit card numbers. Use a required field to discard records without credit card numbers.
- Connect both branches to a Local FS destination.
- In the secondary branch, use an Expression Evaluator processor to add fields to the cash records to match the credit card records. Use data preview to verify the fields to add.
- Add a data rule to raise an alert if too many credit card payments are missing credit card numbers.
- Start the pipeline and monitor the results.
Create a Pipeline and Define Pipeline Properties
When you configure a pipeline, you need to decide what to do with error records. You can discard them or - more productively - write them to file, another pipeline, or a destination, like Amazon S3 or Azure Event Hub.
Write error records to one of these locations as a convenient way to review and reprocess with error records without having to stop the pipeline.
This tutorial writes the records to a local file.
Now we'll start building the pipeline...
Configure the Origin
The origin represents the incoming data for the pipeline. When you configure the origin, you define how to connect to the origin system, the type of data to be processed, and other properties specific to the origin.
This tutorial builds a pipeline that reads a sample CSV file from an HTTP resource URL, processes the data to convert the data type of several fields, and then writes the data to a JSON file on your local machine.
The sample CSV file includes some invalid data, so you'll also see how StreamSets handles errors when you preview the pipeline.
Preview Data
To become more familiar with the data set and gather some important details for pipeline configuration, let's preview the source data.
- The field that contains payment information - We'll use this to route data in the Stream Selector processor.
- The field that contains credit card numbers - We'll use this to mask the data in the Field Masker processor.
When you access data in a field, you specify the field path for the field. The field
path depends on the complexity of the record: /<fieldname>
for
simple records and <path to field>/<fieldname>
for more
complex records.
Because we're using the List-Map root field type, we can use
/<fieldname>
.
To start data preview, all stages must be connected and all required properties defined - though not necessarily correctly. Since the origin is configured and the only stage, the pipeline should be ready to preview as is.
- Payment type information is in the
payment_type
field and credit card information is in thecredit_card
field. - To use these fields in expressions, we'll use their field paths:
/payment_type
and/credit_card
. - To route records paid by credit card, we'll look for records where the payment
type is
CRD
.
Route Data with the Stream Selector
To route data to different streams for processing, we use the Stream Selector processor.
The Stream Selector processor routes data to different streams based on user-defined conditions. Any data not captured by a user-defined condition routes to the default stream.
We'll route credit card transactions to a credit card stream for processing. All other transactions will go to the default stream.
We'll also define a required field to drop records with no payment type. When you define a required field, a record must include data for the specified field to enter the stage. Records that don't include data in required fields are sent to the pipeline for error handling. If you configured the pipeline to write to file, that's where error records go.
To represent data in a field, we use therecord:value
function. This returns field values associated with
the field.${record:value('/payment_type') == 'CRD'}
Note that we enclose expressions in a dollar sign and curly brackets. You can use single or double quotation marks around strings. For more information about the expression language, see Expression Language.
Use Jython for Card Typing
Next, we'll evaluate credit card numbers to determine the credit card type. You can use an Expression Evaluator processor to do the same calculations, but with a short script, the Jython Evaluator processor is easier.
You can use custom scripts with the JavaScript Evaluator and the Jython Evaluator processors to perform processing that is not easily performed using other Data Collector processors. When using scripts to handle list-map data, the script must treat the data as maps.
The Jython script that we provide creates an additional field,
credit_card_type
, and generates the credit card type by
evaluating the first few digits of the credit card number. The script returns an
error message if the record has a credit card payment type without a corresponding
credit card number.
Mask Credit Card Numbers
Now let's prevent sensitive information from reaching internal databases by using a Field Masker processor to mask the credit card numbers.
The Field Masker processor provides fixed and variable-length masks to mask all data in a field. To reveal specified positions in the data, you can use custom masking. To reveal a group of positions within the data, you can use a regular expression mask to define the structure of the data and then reveal one or more groups.
(.*)([0-9]{4})
The regular expression defines two groups so we can reveal the second group.
Write to the Destination
Data Collector can write data to many destinations. The Local FS destination writes to files in a local file system.
When you configure the Local FS destination, you define the directory template. This determines the naming convention for the output directories that are created.
Now, we'll go back to the Stream Selector processor and complete the secondary branch.
Add a Corresponding Field with the Expression Evaluator
The Jython Evaluator script added a new field to the credit payments branch. To ensure all records have the same structure, we'll use the Expression Evaluator processor to add the same field to the non-credit branch.
This ensures that all records have the same format when written to the destination.
To do this, let's use data preview to verify how the Jython Evaluator processor adds the credit card type to records.
Create a Data Rule and Alert
Now before we run the basic pipeline, let's add a data rule and alert. Data rules are user-defined rules used to inspect data moving between two stages. They are a powerful way to look for outliers and anomalous data.
Data rules and alerts require a detailed understanding of the data passing through the pipeline. For more general pipeline monitoring information, you can use metric rules and alerts.
The script in the Jython Evaluator processor creates error records for credit card transactions without credit card numbers. We can create a data rule and alert to let us know when the record count reaches a specified threshold.
We'll use an expression with the record:value()
function to identify
when the credit card number field, /credit_card
, is null. The
function returns the data in the specified field.
Run the Basic Pipeline
The UI enters Monitor mode and shows summary statistics in the Monitor panel. At some point as the pipeline runs, the data alert triggers and the location of the triggered data alert turns red.
Click the Data Alert icon to view the data alert notification. Then, close the notification and explore the information available in the Monitor panel.
Note that when you select an unused part of the canvas, the Monitor panel displays monitoring information for the entire pipeline. When you select a stage, it displays information for the stage.
The Jython Evaluator processor shows 40 error records. Click the error records number to see the list of cached error records and related error messages.
You can also select the red Data Inspection icon to view information about the data alert and view the error records associated with the data alert.

In the Extended Tutorial, we'll add more stages and see more data transformations. We can use data preview to step through the pipeline to review how each stage processes the data.