Scala

The Scala processor runs custom Scala code to transform data. You develop the custom code using the Spark APIs for the version of Spark installed on your cluster. Complete the prerequisite tasks before using the processor in a pipeline.

The processor can have one or more input streams and a single output stream.

The Scala processor receives a Spark DataFrame from each input stream, runs your custom Scala code to transform the DataFrames, and then returns a single DataFrame as output. The processor does not run the code on empty DataFrames, by default.

When you configure the Scala processor, you specify the code to run and whether to run the code on empty DataFrames.

Scala Code

The custom code that you develop for the Scala processor can include any code valid for the Spark APIs for the version of Spark installed on your cluster.

The Scala processor can have one or more input streams and a single output stream.

When the processor has multiple input streams, it receives a DataFrame from each stream. The custom code must call Scala operations to transform the DataFrames and produce a single output DataFrame.

In the custom code, use the inputs and output variables to interact with DataFrames. Reference fields in the DataFrames using the same notation required by Spark.

When you validate a pipeline, invalid code generates compilation errors. You can find compiler output in the Spark driver log that is accessible when you monitor Transformer jobs.

For more information about Spark APIs, see the Spark Scala API documentation for a supported version of Spark.

Scala Requirements

To ensure that Scala code runs as expected, make sure that the following requirements are met:
Compatible Scala version on the cluster
The Scala version on the cluster must be compatible with the Scala processing mechanism included with Transformer.
Transformer is prebuilt with a specific Scala version. To handle the Scala code, Transformer uses the scala.tools.nsc package in the Scala API, which can change between Scala patch releases. For example, the package changed between Scala 2.12.10 and 2.12.14.
To ensure that the Scala code runs as expected, use one of the following recommended Scala versions on the cluster:
Transformer Version Recommended Cluster Runtime Scala Versions
Prebuilt with Scala 2.11 2.11.8 or 2.11.12
Prebuilt with Scala 2.12 2.12.10 or 2.12.14
Note: If the cluster includes a Scala compiler tool in the pipeline application classpath at runtime, the version of the Scala compiler tool takes precedence over the one included with Transformer.
For example, if the cluster includes a Scala 2.12 compiler tool, then use a cluster runtime with a Scala version as recommended in the table above for Scala 2.12.
Valid Scala code
The Scala code must be compatible with the Scala and Spark versions used by the cluster.
Develop the code using the Spark APIs for the version of Spark installed on your cluster.

Input and Output Variables

In the custom code, use the following variables to interact with a DataFrame or the data within the DataFrame:
inputs
Use the inputs variable to access an incoming DataFrame.
Because the processor can receive data from multiple stages, the inputs variable is an array. Use parenthesis notation, (#), to indicate the position in the array, as follows: inputs(#).
Use 0 to access the DataFrame from the first input stream connected to the processor. Use 1 to access the DataFrame from the second input stream, and so on.
For example, when the processor receives data from three different stages, you use inputs(0) to access the DataFrame from the stage connected to the first input stream. To access the DataFrame from the second input stream, you use inputs(1). To access the DataFrame from the third input stream, you use inputs(2).
output
Use the output variable to pass the transformed DataFrame downstream.
After the custom code performs transformations on the DataFrame, assign the transformed DataFrame to the output variable. The processor passes the contents of the output variable to the output stream.
For example, the following code retrieves the name of the file that produced each row of the data and includes that information as a new input_file column as it passes the DataFrame downstream:
import org.apache.spark.sql.functions._
output = inputs(1).withColumn("input_file", input_file_name())

The code operates on the DataFrame received from the processor's second input stream.

Performing Spark SQL Queries

Spark provides multiple ways to perform processing on the data within a DataFrame. One way is to create a temporary table based on the data in a DataFrame, then perform a SQL query on the temporary table.

For example, say you have a processor that receives data from two origins. The following script performs a SQL query on data from the second origin:

val salesData = inputs(1)
salesData.createTempView("sales")
val aggregates = salesData.sparkSession.sql("select avg(total) as avg_total, max(total) as max_total from sales")
output = aggregates
The script performs the following actions:
  • The first line of the script creates the salesData local variable that represents the DataFrame from the second input stream for the processor.
  • The second line creates a temporary table called sales using the data from that DataFrame.
  • The third line uses sparkSession.sql to perform the specified SQL query and saves the results as a local variable called aggregates.

    The query calculates the average for the data in the total column and writes the results to a new avg_total field. It also finds the maximum value in the total field and writes it to a new max_total field.

  • The fourth line passes the results of the query downstream.

For more information about Spark SQL and DataFrames, see the Spark SQL Guide.

For information about Spark SQL functions, see the Spark functions documentation for the version of Spark that you are using.

Examples

Here are some examples of custom Scala code developed for the Scala processor:

Increment Value in Field

The following code increments the value of the first field in the DataFrame by 1, and then passes the results downstream:
import spark.implicits._
import scala.collection.mutable.Buffer
output = inputs(1).map(r => r.getLong(0) + 1).toDF()

The code operates on the DataFrame received from the stage attached to the processor's second input stream.

Retrieve the Origin File Name

The following code imports all Spark SQL functions so it can use the input_file_name Spark SQL function to retrieve the name of the file that produced each row of the data. Then, the code passes the original DataFrame downstream with the input file information included in a new input_file column:
import org.apache.spark.sql.functions._
output = inputs(0).withColumn("input_file", input_file_name())

The code operates on the DataFrame received from the stage attached to the processor's first input stream.

Configuring a Scala Processor

Configure a Scala processor to transform data based on custom Scala code.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
  2. On the Scala tab, configure the following properties:
    Scala Property Description
    Scala Code Scala code to run. Develop the code using the Spark APIs for the version of Spark installed on your cluster.
    Note: When you validate a pipeline, invalid code generates compilation errors. You can find compiler output in the Spark driver log that is accessible when you monitor Transformer jobs.
    Skip Empty Batches Skips running custom code on empty DataFrames. Empty DataFrames are passed to downstream stages.