Window
The Window processor produces new batches of data from incoming batches based on the configured window type. Use the Window processor in streaming pipelines only. The processor is not supported in batch pipelines.
The Window processor creates windows of data and passes each window downstream as a batch. The processor can create the following types of windows: tumbling, sliding, session, and hopping.
You might use the Window processor in streaming pipelines to merge small batches into larger batches for more meaningful downstream processing.
For example, when using an Aggregate processor in a streaming pipeline, you might place the Window processor before the Aggregate processor to create larger batches for aggregate calculations. Similarly, when using a Join processor in a streaming pipeline, you might use the Window processor before the Join processor to enable joins on larger data sets.
When you configure the Window processor, you specify the window type to use and the properties for the window type.
When a window contains no data, the Window processor passes an empty batch downstream.
Window Types
The Window processor creates windows based on the configured window type, then passes the window downstream as a batch.
- Tumbling
- Creates windows of the specified length, opening a new window as soon as the previous closes. You specify the window length in milliseconds. The first window begins when you start the pipeline.
- Sliding
-
Creates a window of a specified length when a triggering condition occurs. You specify the window length in milliseconds. You can configure the condition using any Spark SQL syntax that can be used in the WHERE clause of a query.
Use the sliding window type to create windows when a specified condition occurs. When the condition occurs, the processor creates a window that includes the data that arrived within the window length and the batch that contains the triggering condition. Depending on the window configuration and the flow of data, the sliding window can drop records or pass duplicate records downstream.
For example, say you set the window length to 1000 milliseconds and the condition to
inventory is NULL
, and say ten batches arrive every second. Then every time theinventory
field contains a null value, the processor creates a window that includes the ten batches that arrived in the previous second as well as the batch that includes the record with the nullinventory
field.In this example, if the next batch contains a null
inventory
field, the new window contains duplicate data. If the next nullinventory
field arrives 5 seconds later, then the data between the two windows is dropped from the pipeline. - Session
- Creates a window based on arriving data, closing the window when the
specified interval of time elapses without incoming data. You configure the
session timeout in milliseconds.
Use the session window type to create windows based on expected breaks in the incoming flow of data. With this window, all data is passed downstream exactly once. The session window does not drop records or pass duplicate records downstream. However, the size of the windows is dependent on the window configuration and the flow of data.
For example, say you want a downstream Deduplicate processor to deduplicate records from a regular flow of log data that pauses for one second at the beginning of every hour. To do this, in the Window processor, you set a 800 millisecond session timeout to trigger window creation.
Though this is 200 milliseconds less than one second, you know that this gap is enough to indicate that the session should be closed. The gap also allows for the occasional case where the pause is a bit less than one second. The 200 milliseconds without data at the beginning of the next window is short enough that it does not trigger window creation.
In this example, if the flow of data changes, preventing a consistent one-second break, then the windows created by the processor may expand to an unexpected size.
- Hopping
- Creates a window of a specified length, then hops forward to create another window. You configure both the length of the window and the hop interval in milliseconds. The hop interval is measured from the beginning of the previous window.
Sliding Window Condition
When you use the sliding window type, you configure a condition. The condition determines when the Window processor creates a sliding window. The condition must evaluate to true or false. When the condition evaluates to true, the Window processor creates a window of the specified size and passes it downstream as a batch. The window includes the data that arrived within the window length and the batch that contains the triggering condition.
You can use a condition as simple as ID is NULL
to create windows when a
record with a null ID
field arrives, or you can create as complex a
condition as needed.
- When you define a condition, you typically base it on field values in the record. For information about referencing fields in the condition, see Referencing Fields in Spark SQL Expressions.
- You can use any Spark SQL
syntax that can be used in the WHERE clause of a query, including functions such as
isnull
ortrim
and operators such as=
or<=
.You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.
For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.
year(transaction_date) <= 2018
Sample Conditions
The following table lists some common scenarios that you might adapt for your use:Condition Example | Description |
---|---|
transactions <= 0 |
When the value in the transactions field is
less than or equal to 0, the processor creates a sliding window
of the specified size. |
accountId is NULL |
When the accountId field contains a null
value, the processor creates a sliding window of the specified
size.Note that |
upper(message) like '%ERROR%' |
When the message field contains the string,
ERROR , the processor creates a sliding
window of the specified size.The condition changes the
strings in the |
initcap(country) like 'China' OR initcap(country)
like 'Japan' |
When the value in the country field is
China or Japan , the
processor creates a sliding window of the specified size.The
condition changes the strings in the
|
Configuring a Window Processor
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages. Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.
-
On the Window tab, configure the following
Window Size property.
Window Property Description Window Type Type of window to create: - Hopping - Creates a window of a specified length,
then hops forward to create another window.
Depending on the window configuration and the flow of data, this window can drop records or pass duplicate records downstream.
- Session - Creates a window based on arriving data,
closing the window when the specified interval of
time passes without incoming data.
With this window, all data is passed downstream exactly once.
- Sliding - Creates a window of a specified length
when a triggering condition occurs. The resulting
window includes the data that arrived within the window length
and the batch that contains the triggering condition.
Depending on the window configuration and the flow of data, this window can drop records or pass duplicate records downstream.
- Tumbling - Creates windows of the specified length,
opening a new window as soon as the previous closes.
With this window, all data is passed downstream exactly once.
Window Length (milliseconds) Size of the time window to create, in milliseconds. Available for hopping, sliding, and tumbling windows.
Hop Interval (milliseconds) Milliseconds between the start of sequential hopping windows. For example, if you set the hop interval to 10 and the window length to 3, then the processor starts a new 3 millisecond window 7 milliseconds after the previous window closes.
Session Timeout (milliseconds) The amount of time that must elapse without incoming data to close a session window. Close Window Condition Condition that must evaluate to true to create a sliding window. Important: Ensure that the condition is valid before using it in the processor. When you validate a pipeline, Transformer does not validate the syntax of the condition. - Hopping - Creates a window of a specified length,
then hops forward to create another window.