Stream Selector

The Stream Selector processor passes records to different output streams based on conditions. Define a condition for each output stream that you want to use. All records that do not match a condition pass to a default stream.

When you define a condition, the Stream Selector processor creates an output stream that you connect to downstream stages. The processor passes a record to all streams where it matches the condition.

For example, say you create the following conditions:
Stream 1: State like 'CA'
Stream 2: ID is NULL

When you run the pipeline, records from California pass to Stream 1, records with missing IDs pass to Stream 2, and any records from California with a missing ID pass to both streams. Any records that do not match either condition pass to the default stream.

Use the Stream Selector when you want to create two or more output streams for processing. If you only want to process data that matches one condition, you can use the Filter processor.

Default Stream

The Stream Selector processor passes all data that does not match a condition to a default stream.

When you configure a Stream Selector processor, you connect each output stream to downstream stages in the pipeline. The default stream is the last output stream in the processor.

You can configure the branch for the default stream to perform additional processing or to write directly to a destination. If you do not need to process data from the default stream, you can connect the stream to a Trash destination or use the Filter processor instead.

Conditions

The Stream Selector condition determines the data that the processor passes to the associated output stream. The condition must evaluate to true or false. Records that evaluate to true pass to the output stream.

You can use a condition as simple as ID is NOT NULL or you can create as complex a condition as needed.

Here are some guidelines for conditions:
  • When you define a condition, you typically base it on field values in the record. For information about referencing fields in the condition, see Referencing Fields in Spark SQL Expressions.
  • You can use any Spark SQL syntax that can be used in the WHERE clause of a query, including functions such as isnull or trim and operators such as = or <=.

    You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.

    For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.

For example, the following condition passes records to the stream only when the year of the transaction date is 2000 or later:
year(transaction_date) >= 2000

Sample Conditions

The following table lists some common scenarios that you might adapt for your use:
Condition Example Description
total > 0 If the value in the total field is greater than 0, the record passes to the stream.
total <= 0 If the value in the total field is less than or equal to 0, the record passes to the stream.
accountId is NULL If the record has a null value in the accountId field, the record passes to the stream.

Note that NULL is not case sensitive. You can use null or Null in the condition.

upper(message) like '%ERROR%' If the message field contains the string, ERROR, the record passes to the stream.

The condition changes the strings in the message field to uppercase before performing the evaluation. This allows the condition to also apply to error and Error, for example.

initcap(country) like 'China' OR initcap(country) like 'Japan' If the value in the country field is China or Japan, the record passes to the stream.

The condition changes the strings in the country field to capitalize the first letter before performing the evaluation. This allows the condition to also apply to CHINA and japan, for example.

Configuring a Stream Selector Processor

Configure a Stream Selector processor to route data to different streams based on conditions.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

  2. On the Conditions tab, click the Add icon for each condition that you want to create, then specify the condition to use.
    Each new condition creates a corresponding output location. You cannot edit or delete the condition for the default stream.