Spark SQL Expression

The Spark SQL Expression processor performs record-level Spark SQL calculations and writes the results to new or existing fields.

The Spark SQL Expression processor performs calculations on a record-by-record basis. To transform batches of data using a Spark SQL query, use the Spark SQL Query processor.

When you configure an expression, you define the name of the field to receive the results of the calculation, then specify the Spark SQL expression to use. You can configure multiple expressions in a single processor.

If you specify an output field that does not exist, the Spark SQL Expression processor creates the field for the results of the calculation. If you specify a field that exists, the processor replaces the existing value with the results of the calculation.

Expressions

You can define one or more expressions in the Spark SQL Expression processor. The processor performs the calculations on a record-by-record basis.

You can use an expression as simple as current_timestamp() to add the time of processing to each record, or you can create as complex an expression as needed.

Here are some guidelines for configuring expressions:
  • To base a calculation on field values, you need to reference the field. For tips on referencing fields, see Referencing Fields in Spark SQL Expressions.
    For example, the following expression converts the data in the message field to uppercase:
    upper(message)
  • When you define multiple expressions, the processor evaluates them in the specified order. You can, therefore, use the results of one expression in subsequent expressions in the list.
    For example, say the following expression adjusts the value of sales data in the total field and writes the results to a field named total_adjusted:
    total - (total * .03)

    Then, you can define additional expressions that use the new total_adjusted field, as long as they are listed below the expression that creates the field.

  • You can use any Spark SQL syntax, including functions such as isnull or trim and operators such as = or <=.
    For example, the following expression returns the hour of the current timestamp and places it in the configured output field:
    hour(current_timestamp())

    You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.

    For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.

Configuring a Spark SQL Expression Processor

Configure a Spark SQL Expression processor to perform record-by-record calculations using Spark SQL expressions.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

  2. On the Expressions tab, configure the following properties for every expression:
    Expressions Property Description
    Output Field Output field for the results of the calculation.

    If the field exists in the record, the processor replaces the value in the field. If the field does not exist, the processor creates the field.

    SQL Expression Spark SQL expression to use.
  3. Click the Add icon to configure additional expressions.
    You can use simple or bulk edit mode to configure the expressions.