Kafka Cluster Requirements
Component | Requirement |
---|---|
Spark Streaming for cluster streaming modes | Spark version 2.1 or later |
Apache Kafka | Spark Streaming on YARN requires a Cloudera or Hortonworks distribution of an Apache Kafka cluster version 0.10.0.0 or later. |
sdc.env.sh
or sdcd.env.sh
. Do not change this environment variable
value.Kafka Consumer Maximum Batch Size
When using a Kafka
Consumer origin in cluster mode, the Max Batch Size property is ignored.
Instead, the effective batch size is <Batch Wait Time> x <Rate Limit Per
Partition>
.
For example, if Batch Wait Time is 60 seconds and Rate Limit Per Partition is 1000 messages/second, then the effective batch size from the Spark Streaming perspective is 60 x 1000 = 60000 messages/second. In this example, there is only one partition so only one cluster pipeline is spawned and the batch size for that pipeline is 60000.
If there are two partitions, then the effective batch size from the Spark Streaming perspective is 60 x 1000 x 2 = 120000 messages/second. By default, two cluster pipelines are created. If the number of messages in each partition are equal, then each pipeline receives 60000 messages in one batch. If, however, all 120000 messages are in a single partition, then the cluster pipeline processing that partition receives all 120000 messages.
To reduce the maximum batch size, either reduce the wait time or reduce the rate limit per partition. Similarly, to increase the maximum batch size, either increase the wait time or increase the rate limit per partition.
Configuring Cluster YARN Streaming for Kafka
Complete the following steps to configure a cluster pipeline to read from a Kafka cluster on YARN:
- Verify the installation of Kafka, Spark Streaming, and YARN as the cluster manager.
- Install the Data Collector on a Spark and YARN gateway node.
-
To enable checkpoint metadata storage, grant the user defined in the user
environment variable write permission on
/user/$SDC_USER.
The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start in the Data Collector documentation.For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
$sudo -u hdfs hadoop fs -mkdir /user/sdc $sudo -u hdfs hadoop fs -chown sdc /user/sdc
-
If necessary, specify the location of the
spark-submit script that points to Spark version 2.1 or later.
Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
/usr/bin/spark-submit
If the script is not in this directory, use the SPARK_SUBMIT_YARN_COMMAND environment variable to define the location of the script.The location of the script may differ depending on the Spark version and distribution that you use.For example, when using CDH Spark 2.1, the spark-submit script is in the following directory by default: /usr/bin/spark2-submit. Then, you might use the following command to define the location of the script:export SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
Or, if using Hortonworks Data Platform (HDP) 2.6 which includes Spark 2.2.0, the spark-submit script is in the following directory by default: /usr/hdp/2.6/spark2/bin/spark-submit. Then, you might use the following command to define the location of the script:export SPARK_SUBMIT_YARN_COMMAND=/usr/hdp/2.6/spark2/bin/spark-submit
Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change. -
To enable Data Collector to submit YARN jobs, perform one of the following tasks:
- On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
- On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
-
On YARN, verify that the Spark logging level is set to a severity of INFO or
lower.
YARN sets the Spark logging level to INFO by default. To change the logging level:
- Edit the log4j.properties file, located in the following directory:
<spark-home>/conf/log4j.properties
- Set the log4j.rootCategory property to a severity of INFO or lower, such as DEBUG or TRACE.
- Edit the log4j.properties file, located in the following directory:
-
If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos
authentication.
When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication in the Data Collector documentation.
- In the pipeline properties, on the General tab, set the Execution Mode property to Cluster YARN Streaming.
-
On the Cluster tab, configure the following
properties:
Cluster Property Description Worker Count Number of workers used in a Cluster Yarn Streaming pipeline. Use to limit the number of workers spawned for processing. By default, one worker is spawned for every partition in the topic. Default is 0 for one worker for each partition.
Worker Java Options Additional Java properties for the pipeline. Separate properties with a space. The following properties are set by default.
- XX:+UseConcMarkSweepGC and XX:+UseParNewGC are set to the Concurrent Mark Sweep (CMS) garbage collector.
- Dlog4j.debug enables debug logging for log4j.
Changing the default properties is not recommended.
You can add any valid Java property.
Launcher Env Configuration Additional configuration properties for the cluster launcher. Using simple or bulk edit mode, click the Add icon and define the property name and value.
Worker Memory (MB) Maximum amount of memory allocated to each Data Collector worker in the cluster. Default is 1024 MB.
Extra Spark Configuration For Cluster Yarn Streaming pipelines, you can configure additional Spark configurations to pass to the spark-submit script. Enter the Spark configuration name and the value to use. The specified configurations are passed to the spark-submit script as follows:spark-submit --conf <key>=<value>
For example, to limit the off-heap memory allocated to each executor, you can use the
spark.yarn.executor.memoryOverhead
configuration and set it to the number of MB that you want to use.Data Collector does not validate the property names or values.
For details on additional Spark configurations that you can use, see the Spark documentation for the Spark version that you are using.
-
In the pipeline, use a Kafka Consumer origin.
If necessary, select a cluster mode stage library on the General tab of the origin. For more information about the origin, see Kafka Consumer (deprecated).Note: Batch Wait Time is ignored for the Kafka Consumer origin in cluster mode. For more information, see Kafka Consumer Maximum Batch Size.
- If the Kafka cluster is configured to use SSL/TLS, Kerberos, or both, configure the Kafka Consumer origin to securely connect to the cluster, as described in Security in Kafka Stages.