Window

The Window processor produces new batches of data from incoming batches based on the configured window type. Use the Window processor in streaming pipelines only. The processor is not supported in batch pipelines.

The Window processor creates windows of data and passes each window downstream as a batch. The processor can create the following types of windows: tumbling, sliding, session, and hopping.

You might use the Window processor in streaming pipelines to merge small batches into larger batches for more meaningful downstream processing.

For example, when using an Aggregate processor in a streaming pipeline, you might place the Window processor before the Aggregate processor to create larger batches for aggregate calculations. Similarly, when using a Join processor in a streaming pipeline, you might use the Window processor before the Join processor to enable joins on larger data sets.

When you configure the Window processor, you specify the window type to use and the properties for the window type.

Important: Depending on the window type and configuration, this processor can drop records or pass duplicate records downstream.

When a window contains no data, the Window processor passes an empty batch downstream.

Window Types

The Window processor creates windows based on the configured window type, then passes the window downstream as a batch.

The Window processor provides the following window types:
Tumbling
Creates windows of the specified length, opening a new window as soon as the previous closes. You specify the window length in milliseconds. The first window begins when you start the pipeline.
The tumbling window type is the simplest of the window types. Use this window type to pass all data downstream in sequential windows of the specified length. With this window, all data is passed downstream exactly once. The tumbling window does not drop records or pass duplicate records downstream.
For example, say you want to use a Sort processor to pass sorted data downstream. Based on the data flow, you want to pass one-hour windows of data to the Sort processor, so you configure the Window processor to use a time window of 3,600,000 milliseconds. The Window processor creates a window every hour based on the time when the pipeline started.
Sliding

Creates a window of a specified length when a triggering condition occurs. You specify the window length in milliseconds. You can configure the condition using any Spark SQL syntax that can be used in the WHERE clause of a query.

Use the sliding window type to create windows when a specified condition occurs. When the condition occurs, the processor creates a window that includes the data that arrived within the window length and the batch that contains the triggering condition. Depending on the window configuration and the flow of data, the sliding window can drop records or pass duplicate records downstream.

For example, say you set the window length to 1000 milliseconds and the condition to inventory is NULL, and say ten batches arrive every second. Then every time the inventory field contains a null value, the processor creates a window that includes the ten batches that arrived in the previous second as well as the batch that includes the record with the null inventory field.

In this example, if the next batch contains a null inventory field, the new window contains duplicate data. If the next null inventory field arrives 5 seconds later, then the data between the two windows is dropped from the pipeline.

Session
Creates a window based on arriving data, closing the window when the specified interval of time elapses without incoming data. You configure the session timeout in milliseconds.

Use the session window type to create windows based on expected breaks in the incoming flow of data. With this window, all data is passed downstream exactly once. The session window does not drop records or pass duplicate records downstream. However, the size of the windows is dependent on the window configuration and the flow of data.

For example, say you want a downstream Deduplicate processor to deduplicate records from a regular flow of log data that pauses for one second at the beginning of every hour. To do this, in the Window processor, you set a 800 millisecond session timeout to trigger window creation.

Though this is 200 milliseconds less than one second, you know that this gap is enough to indicate that the session should be closed. The gap also allows for the occasional case where the pause is a bit less than one second. The 200 milliseconds without data at the beginning of the next window is short enough that it does not trigger window creation.

In this example, if the flow of data changes, preventing a consistent one-second break, then the windows created by the processor may expand to an unexpected size.

Hopping
Creates a window of a specified length, then hops forward to create another window. You configure both the length of the window and the hop interval in milliseconds. The hop interval is measured from the beginning of the previous window.
Use the hopping window type to create windows of a specified length at regular intervals. Depending on the window configuration and the flow of data, the hopping window can drop records or pass duplicate records downstream.
For example, say you want a downstream Aggregate processor to count the number of records that arrive every 15 seconds, refreshing the count every 5 seconds. To do this, you set the window length to 15,000 milliseconds and the hop interval to 5,000 milliseconds. In this example, sequential windows can contain duplicate data.
Or, say you want the Aggregate processor to count a sample of incoming data: 1 minute of data every 15 minutes. To do this, you set the window size of the Window processor to 60,000 milliseconds and the hop interval to 900,000. In this example, data that passes between windows is dropped from the pipeline.
If you set the hop interval to the same value as the window length, then the processor creates a new window as soon as the previous window closes, like a tumbling window.

Sliding Window Condition

When you use the sliding window type, you configure a condition. The condition determines when the Window processor creates a sliding window. The condition must evaluate to true or false. When the condition evaluates to true, the Window processor creates a window of the specified size and passes it downstream as a batch. The window includes the data that arrived within the window length and the batch that contains the triggering condition.

You can use a condition as simple as ID is NULL to create windows when a record with a null ID field arrives, or you can create as complex a condition as needed.

Here are some guidelines for sliding window conditions:
  • When you define a condition, you typically base it on field values in the record. For information about referencing fields in the condition, see Referencing Fields in Spark SQL Expressions.
  • You can use any Spark SQL syntax that can be used in the WHERE clause of a query, including functions such as isnull or trim and operators such as = or <=.

    You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.

    For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.

For example, the following condition creates a sliding window when the year of the transaction date is 2018 or earlier:
year(transaction_date) <= 2018
Important: Ensure that the condition is valid before using it in the processor. When you validate a pipeline, Transformer does not validate the syntax of the condition.

Sample Conditions

The following table lists some common scenarios that you might adapt for your use:
Condition Example Description
transactions <= 0 When the value in the transactions field is less than or equal to 0, the processor creates a sliding window of the specified size.
accountId is NULL When the accountId field contains a null value, the processor creates a sliding window of the specified size.

Note that NULL is not case sensitive. For example, you can alternatively use null or Null in the condition.

upper(message) like '%ERROR%' When the message field contains the string, ERROR, the processor creates a sliding window of the specified size.

The condition changes the strings in the message field to uppercase before performing the evaluation. This allows the condition to also apply to error and Error, for example.

initcap(country) like 'China' OR initcap(country) like 'Japan' When the value in the country field is China or Japan, the processor creates a sliding window of the specified size.

The condition changes the strings in the country field to capitalize the first letter before performing the evaluation. This allows the condition to also apply to CHINA and japan, for example.

Configuring a Window Processor

Configure a Window processor to create new batches of data from incoming batches based on the configured window type.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

  2. On the Window tab, configure the following Window Size property.
    Window Property Description
    Window Type Type of window to create:
    • Hopping - Creates a window of a specified length, then hops forward to create another window.

      Depending on the window configuration and the flow of data, this window can drop records or pass duplicate records downstream.

    • Session - Creates a window based on arriving data, closing the window when the specified interval of time passes without incoming data.

      With this window, all data is passed downstream exactly once.

    • Sliding - Creates a window of a specified length when a triggering condition occurs. The resulting window includes the data that arrived within the window length and the batch that contains the triggering condition.

      Depending on the window configuration and the flow of data, this window can drop records or pass duplicate records downstream.

    • Tumbling - Creates windows of the specified length, opening a new window as soon as the previous closes.

      With this window, all data is passed downstream exactly once.

    Window Length (milliseconds) Size of the time window to create, in milliseconds.

    Available for hopping, sliding, and tumbling windows.

    Hop Interval (milliseconds) Milliseconds between the start of sequential hopping windows.

    For example, if you set the hop interval to 10 and the window length to 3, then the processor starts a new 3 millisecond window 7 milliseconds after the previous window closes.

    Session Timeout (milliseconds) The amount of time that must elapse without incoming data to close a session window.
    Close Window Condition Condition that must evaluate to true to create a sliding window.
    Important: Ensure that the condition is valid before using it in the processor. When you validate a pipeline, Transformer does not validate the syntax of the condition.