Spark SQL Expression
The Spark SQL Expression processor performs record-level Spark SQL calculations and writes the results to new or existing fields.
The Spark SQL Expression processor performs calculations on a record-by-record basis. To transform batches of data using a Spark SQL query, use the Spark SQL Query processor.
When you configure an expression, you define the name of the field to receive the results of the calculation, then specify the Spark SQL expression to use. You can configure multiple expressions in a single processor.
If you specify an output field that does not exist, the Spark SQL Expression processor creates the field for the results of the calculation. If you specify a field that exists, the processor replaces the existing value with the results of the calculation.
Expressions
You can define one or more expressions in the Spark SQL Expression processor. The processor performs the calculations on a record-by-record basis.
You can use an expression as simple as current_timestamp()
to add the
time of processing to each record, or you can create as complex an expression as needed.
- To base a calculation on field values, you need to reference the field. For tips
on referencing fields, see Referencing Fields in Spark SQL Expressions.For example, the following expression converts the data in the
message
field to uppercase:upper(message)
- When you define multiple expressions, the processor evaluates them in the
specified order. You can, therefore, use the results of one expression in
subsequent expressions in the list.For example, say the following expression adjusts the value of sales data in the
total
field and writes the results to a field namedtotal_adjusted
:total - (total * .03)
Then, you can define additional expressions that use the new
total_adjusted
field, as long as they are listed below the expression that creates the field. - You can use any Spark SQL
syntax, including functions such as
isnull
ortrim
and operators such as=
or<=
.For example, the following expression returns the hour of the current timestamp and places it in the configured output field:hour(current_timestamp())
You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs.
For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.
Configuring a Spark SQL Expression Processor
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages. Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.
-
On the Expressions tab, configure the following properties
for every expression:
Expressions Property Description Output Field Output field for the results of the calculation. If the field exists in the record, the processor replaces the value in the field. If the field does not exist, the processor creates the field.
SQL Expression Spark SQL expression to use. -
Click the Add icon to configure additional
expressions.
You can use simple or bulk edit mode to configure the expressions.