Stream Selector
The Stream Selector processor passes records to different output streams based on conditions. Define a condition for each output stream that you want to use. All records that do not match a condition pass to a default stream.
When you define a condition, the Stream Selector processor creates an output stream that you connect to downstream stages. The processor passes a record to all streams where it matches the condition.
Stream 1: State like 'CA'
Stream 2: ID is NULL
When you run the pipeline, records from California pass to Stream 1, records with missing IDs pass to Stream 2, and any records from California with a missing ID pass to both streams. Any records that do not match either condition pass to the default stream.
Use the Stream Selector when you want to create two or more output streams for processing. If you only want to process data that matches one condition, you can use the Filter processor.
Default Stream
The Stream Selector processor passes all data that does not match a condition to a default stream.
When you configure a Stream Selector processor, you connect each output stream to downstream stages in the pipeline. The default stream is the last output stream in the processor.
You can configure the branch for the default stream to perform additional processing or to write directly to a destination. If you do not need to process data from the default stream, you can connect the stream to a Trash destination or use the Filter processor instead.
Conditions
The Stream Selector condition determines the data that the processor passes to the associated output stream. The condition must evaluate to true or false. Records that evaluate to true pass to the output stream.
You can use a condition as simple as ID is NOT NULL
or you can create as
complex a condition as needed.
- When you define a condition, you typically base it on field values in the record. For information about referencing fields in the condition, see Referencing Fields in Spark SQL Expressions.
- You can use any Spark SQL
syntax that can be used in the WHERE clause of a query, including functions such as
isnull
ortrim
and operators such as=
or<=
.You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.
For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.
year(transaction_date) >= 2000
Sample Conditions
The following table lists some common scenarios that you might adapt for your use:Condition Example | Description |
---|---|
total > 0 |
If the value in the total field is greater
than 0, the record passes to the stream. |
total <= 0 |
If the value in the total field is less than
or equal to 0, the record passes to the stream. |
accountId is NULL |
If the record has a null value in the
accountId field, the record passes to the
stream. Note that |
upper(message) like '%ERROR%' |
If the message field contains the string,
ERROR , the record passes to the stream.
The condition changes the strings in the
|
initcap(country) like 'China' OR initcap(country)
like 'Japan' |
If the value in the country field is
China or Japan , the record
passes to the stream. The condition changes the strings in
the |
Configuring a Stream Selector Processor
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages. Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.
-
On the Conditions tab, click the
Add icon for each condition that you want to create,
then specify the
condition to use.
Each new condition creates a corresponding output location. You cannot edit or delete the condition for the default stream.