Spark SQL Query
The Spark SQL Query processor runs a Spark SQL query to transform batches of data. To perform record-level calculations using Spark SQL expressions, use the Spark SQL Expression processor.
For each batch of data, the processor receives a single Spark DataFrame as input and registers the input DataFrame as a temporary table in Spark. The processor then runs a Spark SQL query to transform the temporary table, and then returns a new DataFrame as output.
When you configure the processor, you define the Spark SQL query that the processor runs. The Spark SQL query can include Spark SQL and a subset of the functions provided with the StreamSets expression language.
rank
in the query, you'd first want to use a Window processor
before the Spark SQL Query processor.Spark SQL Query Syntax
Spark SQL is a Spark module that acts as a distributed SQL query engine. Spark SQL lets you run SQL queries along with Spark functions to transform DataFrames.
You can include the following elements in the query that you define for the Spark SQL Query processor:
- Spark SQL
- Use Spark SQL syntax in the query, such
as:
SELECT <column name> FROM $table WHERE <condition>
- StreamSets expression language functions
- You can include a subset of the functions provided with the StreamSets expression language in the query. To use a StreamSets
function in a query, include the function in single quotes, for
example:
'${uuid:uuid()}'
Referencing Fields
To reference specific DataFrame fields in Spark SQL queries, use the same field reference notation required by Spark.
ID
field in the input DataFrame,
use:SELECT ID FROM $table
zip_code
field nested in the address
map field in the input DataFrame,
use:SELECT address.zip_code FROM $table
order_item
list field in the input
DataFrame, use:SELECT order_item[2] FROM $table
For more details, see Referencing Fields.
Examples
Here are a few simple examples of Spark SQL queries developed for the Spark SQL Query processor.
Basic Spark SQL Query
Let's say that your pipeline processes the clickstream data of all website users. Your data scientists want to analyze the website usage of teenagers, so you need to pass only the clickstream data for users between the ages of 10 and 19 to the next stage in the pipeline.
SELECT * FROM $table WHERE age >= 10 AND age <= 19
Spark SQL Query with Spark SQL Functions
Let's say that your pipeline processes order data. The order data doesn't have a
primary key, so you want to create a surrogate key. You can use the Spark SQL
function monotonically_increasing_id
in the query to generate a
unique ID for each record.
SELECT
monotonically_increasing_id() AS incID,
order_date_time,store_id,store_zip,product_id,unit_of_measure,order_quantity,
unit_price,average_monthly_usage
FROM $table
Spark SQL Query with StreamSets Expression Language Function
Let's say that you want to tag each record processed by your pipeline with the
pipeline ID. You can use the StreamSets function pipeline:id
to
return the ID of the pipeline and add it as a new field to each record.
SELECT
'${pipeline:id()}' AS pipelineID,
*
FROM $table
Configuring a Spark SQL Query Processor
Configure a Spark SQL Query processor to run a Spark SQL query to transform data.
-
In the Properties panel, on the
General tab, configure the following properties:
General Property Description Name Stage name. Description Optional description. Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages. Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.
- On the Query tab, enter the Spark SQL query to run.