Scala
The Scala processor runs custom Scala code to transform data. You develop the custom code using the Spark APIs for the version of Spark installed on your cluster. Complete the prerequisite tasks before using the processor in a pipeline.
The processor can have one or more input streams and a single output stream.
The Scala processor receives a Spark DataFrame from each input stream, runs your custom Scala code to transform the DataFrames, and then returns a single DataFrame as output. The processor does not run the code on empty DataFrames, by default.
When you configure the Scala processor, you specify the code to run and whether to run the code on empty DataFrames.
Scala Code
The custom code that you develop for the Scala processor can include any code valid for the Spark APIs for the version of Spark installed on your cluster.
The Scala processor can have one or more input streams and a single output stream.
When the processor has multiple input streams, it receives a DataFrame from each stream. The custom code must call Scala operations to transform the DataFrames and produce a single output DataFrame.
In the custom code, use the inputs
and output
variables
to interact with DataFrames. Reference fields in the DataFrames using the same notation
required by Spark.
When you validate a pipeline, invalid code generates compilation errors. You can find compiler output in the Spark driver log that is accessible when you monitor a draft run..
For more information about Spark APIs, see the Spark Scala API documentation for a supported version of Spark.
Scala Requirements
- Compatible Scala version on the cluster
- The Scala version on the cluster must be compatible with the Scala processing mechanism included with Transformer.
- Valid Scala code
- The Scala code must be compatible with the Scala and Spark versions used by the cluster.
Input and Output Variables
- inputs
- Use the
inputs
variable to access an incoming DataFrame. - output
- Use the
output
variable to pass the transformed DataFrame downstream.
input_file
column as it
passes the DataFrame
downstream:import org.apache.spark.sql.functions._
output = inputs(1).withColumn("input_file", input_file_name())
The code operates on the DataFrame received from the processor's second input stream.
Performing Spark SQL Queries
Spark provides multiple ways to perform processing on the data within a DataFrame. One way is to create a temporary table based on the data in a DataFrame, then perform a SQL query on the temporary table.
For example, say you have a processor that receives data from two origins. The following script performs a SQL query on data from the second origin:
val salesData = inputs(1)
salesData.createTempView("sales")
val aggregates = salesData.sparkSession.sql("select avg(total) as avg_total, max(total) as max_total from sales")
output = aggregates
- The first line of the script creates the
salesData
local variable that represents the DataFrame from the second input stream for the processor. - The second line creates a temporary table called
sales
using the data from that DataFrame. - The third line uses
sparkSession.sql
to perform the specified SQL query and saves the results as a local variable calledaggregates
.The query calculates the average for the data in the
total
column and writes the results to a newavg_total
field. It also finds the maximum value in thetotal
field and writes it to a newmax_total
field. - The fourth line passes the results of the query downstream.
For more information about Spark SQL and DataFrames, see the Spark SQL Guide.
For information about Spark SQL functions, see the Spark functions documentation for the version of Spark that you are using.
Examples
Here are some examples of custom Scala code developed for the Scala processor:
Increment Value in Field
import spark.implicits._
import scala.collection.mutable.Buffer
output = inputs(1).map(r => r.getLong(0) + 1).toDF()
The code operates on the DataFrame received from the stage attached to the processor's second input stream.
Retrieve the Origin File Name
input_file_name
Spark SQL function to retrieve the name of the
file that produced each row of the data. Then, the code passes the original
DataFrame downstream with the input file information included in a new
input_file
column:import org.apache.spark.sql.functions._
output = inputs(0).withColumn("input_file", input_file_name())
The code operates on the DataFrame received from the stage attached to the processor's first input stream.
Configuring a Scala Processor
Configure a Scala processor to transform data based on custom Scala code.
-
In the Properties panel, on the General
tab, configure the following properties:
General Property Description Name Stage name. Description Optional description. -
On the Scala tab, configure the following properties:
Scala Property Description Scala Code Scala code to run. Develop the code using the Spark APIs for the version of Spark installed on your cluster. Note: When you validate a pipeline, invalid code generates compilation errors. You can find compiler output in the Spark driver log that is accessible when you monitor a draft run..Skip Empty Batches Skips running custom code on empty DataFrames. Empty DataFrames are passed to downstream stages.