Spark

The Spark executor starts a Spark application each time it receives an event. You can use the Spark executor with Spark on YARN. For information about supported versions, see Supported Systems and Versions.

Use the Spark executor to start a Spark application as part of an event stream. You can use the executor in any logical way, such as running Spark applications after the Hadoop FS, MapR FS, or Amazon S3 destination closes files. For example, you might use the executor to start a Spark application that converts Avro files to Parquet each time the Hadoop FS destination closes a file.

Note that the Spark executor starts an application in an external system. It does not monitor the application or wait for it to complete. The executor becomes available for additional processing as soon as it successfully submits an application.

The Spark executor can run the application in client or cluster mode. Run the application in client mode only when resource use is not a concern.

Before you use the Spark executor, make sure to perform the prerequisite task.

When you configure the Spark executor, you can specify the number of worker nodes Spark should use, or you can enable dynamic allocation and specify the minimum and maximum number of worker nodes. Dynamic allocation allows Spark to use additional worker nodes as needed, within the specified range.

You can specify additional cluster manager properties to pass to Spark, such as the maximum amount of memory that the application driver and executor can use.

You can also configure additional Spark arguments and environment variables. Any arguments and variables that you enter override any previous definitions, including those in the Spark application, elsewhere in the Spark executor, and the Data Collector machine.

You can specify custom Spark and Java home directories, and a Hadoop proxy user. You can also enter Kerberos credentials if needed.

When you configure the application details, you specify the language used to write the application and then define language-specific properties.

You can also configure the executor to generate events for another event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Spark Versions and Stage Libraries

The Spark executor supports only Spark version 2.1 or later.

When you use the Spark executor, make sure the Spark version is the same across all related components, as follows:

  • When using the executor to run an application on Spark on YARN, make sure the Spark version used in the selected stage library matches the Spark version used to build the application.

    For example, if you use Spark 2.1 to build the application, use a Spark executor provided in one of the Spark 2.1 stage libraries.

  • When using the executor in a cluster streaming pipeline, the Spark version in the selected stage library must also match the Spark version used by the cluster.

    For example, if your cluster uses Spark 2.2, use a stage library that includes Spark 2.2.

The Spark executor is available in Cloudera and MapR stage libraries. To verify the Spark version that a stage library includes, see the Cloudera or HPE Ezmeral Data Fabric documentation. For more information about the stage libraries that include the stage, see Common Stage Libraries.

Prerequisite

Before you run a Spark executor pipeline that starts applications on YARN, you must enable the Spark executor to submit an application.

You can enable the Spark executor to submit an application in several different ways. Perform one of the following tasks to enable the executor to submit applications:
Configure the YARN Minimum User ID property, min.user.id
The min.user.id property is set to 1000 by default. To allow job submission:
  1. Verify the user ID being used by the Data Collector user, typically named "sdc".
  2. In Hadoop, configure the YARN min.user.id property.

    Set the property to equal to or lower than the Data Collector user ID.

Configure the YARN Allowed System Users property, allowed.system.users
The allowed.system.users property lists allowed user names. To allow job submission:
  1. In Hadoop, configure the YARN allowed.system.users property.

    Add the Data Collector user name, typically "sdc", to the list of allowed users.

Configure the Spark executor Proxy User property
In the Spark executor, the Proxy User property allows you to enter a user name for the stage to use when submitting applications. To allow application submission:
  1. In the Spark executor stage, on the Spark tab, configure the Proxy User property.

    Enter a user with an ID that is higher than the min.user.id property, or with a user name that is listed in the allowed.system.users property.

For information about using a Hadoop User, see Using a Proxy Hadoop User.

Spark Home Requirement

When running an application on YARN, the Spark executor requires access to the spark-submit script located in the Spark installation directory.

By default, the Spark executor uses the directory defined in the SPARK_HOME environment variable on the Data Collector machine. The SPARK_HOME environment variable must be set before you start Data Collector.
Note: When Spark 2 is installed on a Cloudera cluster, set the SPARK_HOME environment variable for Data Collector as follows:
export SPARK_HOME=/opt/cloudera/parcels/SPARK2/lib/spark2

You can override the environment variable as needed by configuring the Custom Spark Home property in the executor stage properties. Use the Custom Spark Home property when the SPARK_HOME environment variable is not set, or when it points to a conflicting version of Spark.

For example, if you are using a Spark 2.1 stage library for the Spark executor and SPARK_HOME points to an earlier version of Spark, use the Custom Spark Home property to specify the location of the Spark 2.1 spark-submit script.

Application Properties

When using the Spark executor, you specify an application name. The application name displays in the cluster manager and Spark server logs, so use a distinctive name to enable distinguishing the Spark application from others. For example, SDC_<pipeline name>_<app_type>.

In the executor, you can enable verbose logging to help test the pipeline and debug the application.

Configure additional application details based on the language used to write the application:
Java or Scala
For applications written in Java or Scala, you specify the main class and application resource - the full path to the primary JAR or file.
You can specify additional arguments and JARs to use. You can also pass additional files to the application using the --files protocol.
Python
For applications written in Python, you specify the application resource - the full path to the primary Python file - and any required dependencies. You can define application arguments and pass additional files to the application using the --files protocol.
Note: Make sure the user that runs Data Collector - or the Hadoop proxy user, if configured - has read permission on all required paths.

Using a Proxy Hadoop User

You can configure the Spark executor to use a Hadoop user as a proxy user to submit applications to Spark on YARN.

By default, the Data Collector uses the user account who started it to connect to external systems. When using Kerberos, the Data Collector can use the Kerberos principal specified in the executor.

To use a Hadoop user, perform the following tasks:
  1. On the external system, configure the Data Collector user as a proxy user and authorize the Data Collector user to impersonate the Hadoop user.

    For more information, see the Hadoop documentation.

  2. In the Spark executor, on the Spark tab, configure the Proxy User property to use the Hadoop user name.

Kerberos Authentication

You can use Kerberos authentication to connect to the destination system where output files are written. To enable this, on the Credentials tab of the Spark executor, enter the Kerberos principal and keytab for the YARN cluster where the application runs.

Event Generation

The Spark executor can generate events that you can use in an event stream. When you enable event generation, the executor generates events each time it starts a Spark application.

Spark executor events can be used in any logical way. For example:

Since Spark executor events include the application ID for each application that it starts, you might generate events to keep a log of the application IDs.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Records

Event records generated by the Spark executor have the following event-related record header attributes. Record header attributes are stored as String values:
Record Header Attribute Description
sdc.event.type Event type. Uses the following type:
  • AppSubmittedEvent - Generated when the executor starts a Spark application.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
Event records generated by the Spark executor have the following fields:
Event Field Name Description
app_id YARN application ID for the Spark application.

Monitoring

Data Collector does not monitor Spark applications. Use your regular cluster monitor application to view the status of applications.

Applications started by the Spark executor display using the application name specified in the stage. The application name is the same for all instances of the application. You can find the application ID for a particular instance in the Data Collector log.

The Spark executor also writes the application ID to the event record. To keep a record of all application IDs, enable event generation for the stage.

Configuring a Spark Executor

Configure a Spark executor to start a Spark application each time the executor receives an event record.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    Produce Events Generates event records when events occur. Use for event handling.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

  2. On the Spark tab, configure the following properties:
    Spark Property Description
    Deploy Mode Deploy mode for the application:
    • Client - Runs the application in Spark client mode. Use only when resources are not a concern.
    • Cluster - Runs the application in Spark cluster mode. Cluster mode deploys the application on the YARN cluster.
    Driver Memory Maximum amount of memory the driver can use for the application.

    Enter the number and a standard Java unit of measure without additional spaces. For example, 10m.

    You can use k or K, m or M, or g or G.

    Executor Memory Maximum amount of memory the executor can use.

    Enter the number and a standard Java unit of measure without additional spaces. For example, 100k.

    You can use k or K, m or M, or g or G.

    Dynamic Allocation Enables the dynamic allocation of executors to start an applications.
    Number of Worker Nodes The exact number of worker nodes for Spark to use. Configure when not using dynamic allocation.
    Minimum Number of Worker Nodes The minimum number of worker nodes for Spark to use. Configure when using dynamic allocation.
    Maximum Number of Worker Nodes The maximum number of worker nodes for Spark to use. Configure when using dynamic allocation.
    Proxy User Hadoop user to connect to the external system and run the application. When using this property, make sure the external system is configured appropriately.

    By default, the pipeline uses the Data Collector user.

    Custom Spark Home Use to enter a custom Spark home directory. By default, the origin uses the directory specified in the SPARK_HOME environment variable on the Data Collector machine.

    This property overrides the SPARK_HOME environment variable.

    Required if the environment variable is not set for the Data Collector machine or if the variable is set for an incorrect version of Spark.

    For example, to run a job against Spark 2.1, point this property to the Spark 2.1 directory if the SPARK_HOME environment variable points to an earlier version of Spark.

    Custom Java Home Use to enter a custom Java home directory. By default, the origin uses the directory specified in the JAVA_HOME environment variable on the Data Collector machine.

    This property overrides the Data Collector environment variable.

    Required if the environment variable is not set for the Data Collector machine.

    Additional Spark Arguments Additional arguments to pass to Spark. Overrides any previous configuration for the specified arguments. For a list of available arguments, see the Spark documentation.
    Additional Spark Arguments and Values Additional arguments with values to pass to Spark. Overrides any previous configuration for the specified arguments. For a list of available arguments, see the Spark documentation.
    Environment Variables Additional environment variables to use. Overrides any previous configuration for the specified arguments. For a list of valid environment variables, see the Spark documentation.
  3. Click the Application tab, select the Language used to write the application, and then configure the following properties:
    For applications written in Java or Scala, configure the following properties:
    Java/Scala Application Properties Description
    Application Name Name to display in YARN resource manager and logs. Also displays in Spark server history pages.
    Tip: Use a name that distinguishes the application from those started by other processes and other pipelines, such as SDC_<pipeline name>_<app_type>.
    Application Resource The full path to the JAR that contains the main class.
    Main Class The full path to the main class for the Spark application.
    Application Arguments You can add additional arguments to pass to the application.

    Enter the arguments exactly as expected, and in the expected order. The executor does not validate the arguments.

    Additional JARs You can specify additional JARs to use. Enter the full path to the JAR.
    Additional Files Additional files to pass to the application using the --files protocol. Enter the full path to the files.

    For information about the protocol, see the Spark documentation.

    Enable Verbose Logging Enables logging additional information to the Data Collector log.

    To avoid filling the log with unnecessary information, enable this property only when testing the pipeline.

    For applications written in Python, configure the following properties:
    Python Application Properties Description
    Application Name Name to display in YARN resource manager and logs. Also displays in Spark server history pages.
    Tip: Use a name that distinguishes the application from those started by other processes and other pipelines, such as SDC_<pipeline name>_<app_type>.
    Application Resource The full path to the Python file to run.
    Application Arguments You can add additional arguments to pass to the application.

    Enter the arguments exactly as expected, and in the expected order. The executor does not validate the arguments.

    Dependencies Full path to any files the Python application resource requires.
    Additional Files Additional files to pass to the application using the --files protocol. Enter the full path to the files.

    For information about the protocol, see the Spark documentation.

    Enable Verbose Logging Enables logging additional information to the Data Collector log.

    To avoid filling the log with unnecessary information, enable this property only when testing the pipeline.

  4. Optionally, click the Credentials tab and configure the following properties:
    Credentials Properties Description
    Kerberos Principal Kerberos principal for the YARN cluster where the application runs.
    Kerberos Keytab Kerberos keytab for the YARN cluster where the application runs.