Spark SQL Query

The Spark SQL Query processor runs a Spark SQL query to transform batches of data. To perform record-level calculations using Spark SQL expressions, use the Spark SQL Expression processor.

For each batch of data, the processor receives a single Spark DataFrame as input and registers the input DataFrame as a temporary table in Spark. The processor then runs a Spark SQL query to transform the temporary table, and then returns a new DataFrame as output.

When you configure the processor, you define the Spark SQL query that the processor runs. The Spark SQL query can include Spark SQL and a subset of the functions provided with the StreamSets expression language.

Tip: In streaming pipelines, you can use a Window processor upstream from this processor to generate larger batch sizes for evaluation. For example, to include one of the Spark SQL window functions such as rank in the query, you'd first want to use a Window processor before the Spark SQL Query processor.

Spark SQL Query Syntax

Spark SQL is a Spark module that acts as a distributed SQL query engine. Spark SQL lets you run SQL queries along with Spark functions to transform DataFrames.

You can include the following elements in the query that you define for the Spark SQL Query processor:

Spark SQL
Use Spark SQL syntax in the query, such as:
SELECT <column name> FROM $table WHERE <condition> 
You can include any Spark SQL function or operator in the query. You can also use user-defined functions (UDFs), but you must define the UDFs in the pipeline. Use a pipeline preprocessing script to define UDFs. For more information about Spark SQL functions, see the Apache Spark SQL Functions documentation.
When you start the pipeline, Transformer automatically registers the input DataFrame as a temporary table in Spark. Use the $table variable in the query to refer to the registered temporary table.
When entering the query in the processor, press Ctrl + Space Bar to view the list of valid SQL keywords you can use.
StreamSets expression language functions
You can include a subset of the functions provided with the StreamSets expression language in the query. To use a StreamSets function in a query, include the function in single quotes, for example:
'${uuid:uuid()}'
When entering the query in the processor, press Ctrl + Space Bar to view the list of valid functions you can use.

Referencing Fields

To reference specific DataFrame fields in Spark SQL queries, use the same field reference notation required by Spark.

For example, to reference the first-level ID field in the input DataFrame, use:
SELECT ID FROM $table
To reference the zip_code field nested in the address map field in the input DataFrame, use:
SELECT address.zip_code FROM $table
To reference the third item in an order_item list field in the input DataFrame, use:
SELECT order_item[2] FROM $table

For more details, see Referencing Fields.

Examples

Here are a few simple examples of Spark SQL queries developed for the Spark SQL Query processor.

Basic Spark SQL Query

Let's say that your pipeline processes the clickstream data of all website users. Your data scientists want to analyze the website usage of teenagers, so you need to pass only the clickstream data for users between the ages of 10 and 19 to the next stage in the pipeline.

You define the following query:
SELECT * FROM $table WHERE age >= 10 AND age <= 19
Tip: To filter pipeline data using a simple query such as this, use the Filter processor. The Spark SQL Query processor is designed to run complex queries that cannot be accomplished with the Filter processor.

Spark SQL Query with Spark SQL Functions

Let's say that your pipeline processes order data. The order data doesn't have a primary key, so you want to create a surrogate key. You can use the Spark SQL function monotonically_increasing_id in the query to generate a unique ID for each record.

You define the following query:
SELECT
monotonically_increasing_id() AS incID,
order_date_time,store_id,store_zip,product_id,unit_of_measure,order_quantity,
unit_price,average_monthly_usage
FROM $table

Spark SQL Query with StreamSets Expression Language Function

Let's say that you want to tag each record processed by your pipeline with the pipeline ID. You can use the StreamSets function pipeline:id to return the ID of the pipeline and add it as a new field to each record.

You define the following query:
SELECT
'${pipeline:id()}' AS pipelineID,
*
FROM $table

Configuring a Spark SQL Query Processor

Configure a Spark SQL Query processor to run a Spark SQL query to transform data.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

  2. On the Query tab, enter the Spark SQL query to run.