PySpark

The PySpark processor transforms data based on custom PySpark code. You develop the custom code using the Python API for Spark, or PySpark. The PySpark processor supports Python 3.

The processor can receive multiple input streams, but can produce only a single output stream. When the processor receives multiple input streams, it receives one Spark DataFrame from each input stream. The custom PySpark code must produce a single DataFrame.
Tip: In streaming pipelines, you can use a Window processor upstream from this processor to generate larger batch sizes for evaluation.

You can use the PySpark processor in pipelines that provision a Databricks cluster, in standalone pipelines, and in pipelines that run on any existing cluster except for Dataproc. Do not use the processor in Dataproc pipelines or in pipelines that provision non-Databricks clusters.

Prerequisites

Complete all prerequisite tasks before using the PySpark processor in pipelines that run on existing clusters or in standalone pipelines.

When using the processor in pipelines that provision a Databricks cluster, you must perform the required tasks.

Provisioned Databricks Cluster Requirements

When you use the PySpark processor in a pipeline that provisions a Databricks cluster, you must include several environment variables in the pipeline properties.

On the Cluster tab of the pipeline properties, add the following Spark environment variables to the cluster details configured in the Cluster Configuration property:

"spark_env_vars": {
        "PYTHONPATH": "/databricks/spark/python/lib/py4j-<version>-src.zip:/databricks/spark/python:/databricks/spark/bin/pyspark",
        "PYSPARK_PYTHON": "/databricks/python3/bin/python3",
        "PYSPARK_DRIVER_PYTHON": "/databricks/python3/bin/python3"
 } 

Note that the PYTHONPATH variable requires the py4j version used by the cluster.

For example, the following cluster configuration provisions a Databricks cluster that uses py4j version 0.10.7 to run a pipeline with a PySpark processor:

For more information about provisioning a Databricks cluster, see Provisioned Cluster.

Develop the PySpark Code

When you develop custom code for the PySpark processor, you can include any valid code available with PySpark, as described in the Spark Python API documentation.

The PySpark processor receives one or more Spark DataFrames as input. Your custom code calls PySpark operations to transform the DataFrames. The custom code must produce a single DataFrame as output.

In the custom code, use the inputs and output variables to interact with DataFrames. Reference fields in the DataFrames using the notation required by Spark.

Important: Ensure that the custom PySpark code is valid before using it in the processor. When you validate a pipeline, Transformer does not validate the PySpark code.

Input and Output Variables

In the custom PySpark code, use the following variables to interact with DataFrames:
inputs
Use the inputs variable to access input DataFrames.
Because the PySpark processor can receive multiple DataFrames, the inputs variable is an array. Use bracket notation ([#]) to indicate the position in the array. Use 0 to access the DataFrame from the first input stream connected to the processor. Use 1 to access the DataFrame from the second input stream, and so on.
For example, when the processor receives a single DataFrame, use inputs[0] to access the DataFrame. When the processor receives two DataFrames, use inputs[0] to access the DataFrame from the first input stream connected to the processor, and use inputs[1] to access the DataFrame from the second input stream.
output
Use the output variable to pass the transformed DataFrame downstream.
After the custom code performs transformations on the DataFrame, assign the transformed DataFrame to the output variable. The processor passes the contents of the output variable to the output stream.

Referencing Fields

To reference specific DataFrame fields in the custom code, use dot notation (.) after the inputs variable and then use the field reference notation required by Spark.

For example, to reference the first-level ID field in the input DataFrame, use:
inputs[0].ID
To reference the zip_code field nested in the address map field in the input DataFrame, use:
inputs[0].address.zip_code
To reference the third item in an order_item list field in the input DataFrame, use:
inputs[0].order_item[2]

For more details, see Referencing Fields.

Examples

Here are a few simple examples of custom PySpark code developed for the PySpark processor.

Compute Summary Statistics of a DataFrame

Let's say that your pipeline processes order data. You want to use the PySpark describe operation to calculate basic summary statistics including the mean, standard deviation, count, min, and max for all numeric and string columns.

You connect a single input stream to the PySpark processor, and then add the following PySpark code to the processor:
output = inputs[0].describe()

Combine Data in Two DataFrames

Let's say that your pipeline processes employee data from two separate databases. The employee data sets contain the same number of columns with the same data types. You want to use the PySpark union operation to combine data from both DataFrames into a single DataFrame.

You connect both input streams to the PySpark processor, and then add the following PySpark code to the processor:
output = inputs[0].union(inputs[1])

Configuring a PySpark Processor

Configure a PySpark processor to transform data based on custom PySpark code. You can use the PySpark processor in pipelines that provision a Databricks cluster, in standalone pipelines, and in pipelines that run on any existing cluster except for Dataproc. Do not use the processor in Dataproc pipelines or in pipelines that provision non-Databricks clusters.

Note: Complete all prerequisite tasks before using the PySpark processor in pipelines that run on existing clusters or in standalone pipelines. When provisioning a Databricks cluster to run the pipeline, be sure to specify the required cluster configuration.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

  2. On the PySpark tab, enter the PySpark code to run.

    Ensure that the custom PySpark code is valid before using it in the processor. When you validate a pipeline, Transformer does not validate the PySpark code.