Kafka Cluster Requirements

Cluster mode pipelines that read from a Kafka cluster have the following requirements:
Component Requirement
Spark Streaming for cluster streaming modes Spark version 2.1 or later
Apache Kafka Spark Streaming on YARN requires a Cloudera or Hortonworks distribution of an Apache Kafka cluster version or later.
Note: By default, a Cloudera CDH cluster sets the Kafka-Spark integration version as 0.9. However, Data Collector cluster streaming pipelines require version 0.10 of the Kafka-Spark integration. As a result, the SPARK_KAFKA_VERSION environment variable is set to 0.10 by default in the Data Collector environment configuration file - sdc.env.sh or sdcd.env.sh. Do not change this environment variable value.

Kafka Consumer Maximum Batch Size

When using a Kafka Consumer origin in cluster mode, the Max Batch Size property is ignored. Instead, the effective batch size is <Batch Wait Time> x <Rate Limit Per Partition>.

For example, if Batch Wait Time is 60 seconds and Rate Limit Per Partition is 1000 messages/second, then the effective batch size from the Spark Streaming perspective is 60 x 1000 = 60000 messages/second. In this example, there is only one partition so only one cluster pipeline is spawned and the batch size for that pipeline is 60000.

If there are two partitions, then the effective batch size from the Spark Streaming perspective is 60 x 1000 x 2 = 120000 messages/second. By default, two cluster pipelines are created. If the number of messages in each partition are equal, then each pipeline receives 60000 messages in one batch. If, however, all 120000 messages are in a single partition, then the cluster pipeline processing that partition receives all 120000 messages.

To reduce the maximum batch size, either reduce the wait time or reduce the rate limit per partition. Similarly, to increase the maximum batch size, either increase the wait time or increase the rate limit per partition.

Configuring Cluster YARN Streaming for Kafka

Complete the following steps to configure a cluster pipeline to read from a Kafka cluster on YARN:

  1. Verify the installation of Kafka, Spark Streaming, and YARN as the cluster manager.
  2. Install the Data Collector on a Spark and YARN gateway node.
  3. To enable checkpoint metadata storage, grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. If necessary, specify the location of the spark-submit script that points to Spark version 2.1 or later.
    Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
    If the script is not in this directory, use the SPARK_SUBMIT_YARN_COMMAND environment variable to define the location of the script.
    The location of the script may differ depending on the Spark version and distribution that you use.
    For example, when using CDH Spark 2.1, the spark-submit script is in the following directory by default: /usr/bin/spark2-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
    Or, if using Hortonworks Data Platform (HDP) 2.6 which includes Spark 2.2.0, the spark-submit script is in the following directory by default: /usr/hdp/2.6/spark2/bin/spark-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/usr/hdp/2.6/spark2/bin/spark-submit
    Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change.
  5. To enable Data Collector to submit YARN jobs, perform one of the following tasks:
    • On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
    • On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
  6. On YARN, verify that the Spark logging level is set to a severity of INFO or lower.
    YARN sets the Spark logging level to INFO by default. To change the logging level:
    1. Edit the log4j.properties file, located in the following directory:
    2. Set the log4j.rootCategory property to a severity of INFO or lower, such as DEBUG or TRACE.
  7. If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos authentication.
    When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.
    Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.
    Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication.
  8. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster YARN Streaming.
  9. On the Cluster tab, configure the following properties:
    Cluster Property Description
    Worker Count Number of workers used in a Cluster Yarn Streaming pipeline. Use to limit the number of workers spawned for processing. By default, one worker is spawned for every partition in the topic.

    Default is 0 for one worker for each partition.

    Worker Java Options Additional Java properties for the pipeline. Separate properties with a space.

    The following properties are set by default.

    • XX:+UseConcMarkSweepGC and XX:+UseParNewGC are set to the Concurrent Mark Sweep (CMS) garbage collector.
    • Dlog4j.debug enables debug logging for log4j.

    Changing the default properties is not recommended.

    You can add any valid Java property.

    Launcher Env Configuration

    Additional configuration properties for the cluster launcher. Using simple or bulk edit mode, click the Add icon and define the property name and value.

    Worker Memory (MB) Maximum amount of memory allocated to each Data Collector worker in the cluster.

    Default is 1024 MB.

    Extra Spark Configuration For Cluster Yarn Streaming pipelines, you can configure additional Spark configurations to pass to the spark-submit script. Enter the Spark configuration name and the value to use.
    The specified configurations are passed to the spark-submit script as follows:
    spark-submit --conf <key>=<value>

    For example, to limit the off-heap memory allocated to each executor, you can use the spark.yarn.executor.memoryOverhead configuration and set it to the number of MB that you want to use.

    Data Collector does not validate the property names or values.

    For details on additional Spark configurations that you can use, see the Spark documentation for the Spark version that you are using.

  10. In the pipeline, use a Kafka Consumer origin.
    If necessary, select a cluster mode stage library on the General tab of the origin. For more information about the origin, see Kafka Consumer (deprecated).
    Note: Batch Wait Time is ignored for the Kafka Consumer origin in cluster mode. For more information, see Kafka Consumer Maximum Batch Size.
  11. If the Kafka cluster is configured to use SSL/TLS, Kerberos, or both, configure the Kafka Consumer origin to securely connect to the cluster, as described in Security in Kafka Stages.