Configuring Cluster YARN Streaming for Kafka

Complete the following steps to configure a cluster pipeline to read from a Kafka cluster on YARN:

  1. Verify the installation of Kafka, Spark Streaming, and YARN as the cluster manager.
  2. Install the Data Collector on a Spark and YARN gateway node.
  3. To enable checkpoint metadata storage, grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Startsee User and Group for Service Start in the Data Collector documentation.
    For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. If necessary, specify the location of the spark-submit script that points to Spark version 2.1 or later.
    Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
    /usr/bin/spark-submit
    If the script is not in this directory, use the SPARK_SUBMIT_YARN_COMMAND environment variable to define the location of the script.
    The location of the script may differ depending on the Spark version and distribution that you use.
    For example, when using CDH Spark 2.1, the spark-submit script is in the following directory by default: /usr/bin/spark2-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
    Or, if using Hortonworks Data Platform (HDP) 2.6 which includes Spark 2.2.0, the spark-submit script is in the following directory by default: /usr/hdp/2.6/spark2/bin/spark-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/usr/hdp/2.6/spark2/bin/spark-submit
    Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change.
  5. To enable Data Collector to submit YARN jobs, perform one of the following tasks:
    • On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
    • On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
  6. On YARN, verify that the Spark logging level is set to a severity of INFO or lower.
    YARN sets the Spark logging level to INFO by default. To change the logging level:
    1. Edit the log4j.properties file, located in the following directory:
      <spark-home>/conf/log4j.properties
    2. Set the log4j.rootCategory property to a severity of INFO or lower, such as DEBUG or TRACE.
  7. If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos authentication.
    When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.
    Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.
    Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos AuthenticationKerberos Authentication in the Data Collector documentation.
  8. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster YARN Streaming.
  9. On the Cluster tab, configure the following properties:
    Cluster Property Description
    Worker Count Number of workers used in a Cluster Yarn Streaming pipeline. Use to limit the number of workers spawned for processing. By default, one worker is spawned for every partition in the topic.

    Default is 0 for one worker for each partition.

    Worker Java Options Additional Java properties for the pipeline. Separate properties with a space.

    The following properties are set by default.

    • XX:+UseConcMarkSweepGC and XX:+UseParNewGC are set to the Concurrent Mark Sweep (CMS) garbage collector.
    • Dlog4j.debug enables debug logging for log4j.

    Changing the default properties is not recommended.

    You can add any valid Java property.

    Launcher Env Configuration

    Additional configuration properties for the cluster launcher. Using simple or bulk edit mode, click the Add icon and define the property name and value.

    Worker Memory (MB) Maximum amount of memory allocated to each Data Collector worker in the cluster.

    Default is 1024 MB.

    Extra Spark Configuration For Cluster Yarn Streaming pipelines, you can configure additional Spark configurations to pass to the spark-submit script. Enter the Spark configuration name and the value to use.
    The specified configurations are passed to the spark-submit script as follows:
    spark-submit --conf <key>=<value>

    For example, to limit the off-heap memory allocated to each executor, you can use the spark.yarn.executor.memoryOverhead configuration and set it to the number of MB that you want to use.

    Data Collector does not validate the property names or values.

    For details on additional Spark configurations that you can use, see the Spark documentation for the Spark version that you are using.

  10. In the pipeline, use a Kafka Consumer origin.
    If necessary, select a cluster mode stage library on the General tab of the origin. For more information about the origin, see Kafka Consumer (deprecated).
    Note: Batch Wait Time is ignored for the Kafka Consumer origin in cluster mode. For more information, see Kafka Consumer Maximum Batch Size.
  11. If the Kafka cluster is configured to use SSL/TLS, Kerberos, or both, configure the Kafka Consumer origin to securely connect to the cluster, as described in Security in Kafka Stages.