The Kafka Consumer origin reads data from a single topic in an Apache Kafka cluster. To use multiple threads to read from multiple topics, use the Kafka Multitopic Consumer.
When you configure a Kafka Consumer, you configure the consumer group name, topic, and ZooKeeper connection information.
When using Kafka version 0.8.2 or later to consume messages in the Avro format, you can configure the Kafka Consumer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
You can add additional Kafka configuration properties as needed. When using Kafka version 0.9.0.0 or later, you can also configure the origin to use Kafka security features.
Kafka Consumer includes record header attributes that enable you to use information about the record in pipeline processing.
When you start a pipeline for the first time, the Kafka Consumer becomes a new consumer group for the topic.
By default, the origin reads only incoming data, processing data from all partitions and ignoring any existing data in the topic. After the origin passes data to destinations, it saves the offset with Kafka or ZooKeeper. When you stop and restart the pipeline, processing continues based on the offset.
For versions before Kafka 0.9.0.0, the offset is stored with Kafka or ZooKeeper based on the offsets.storage Kafka property. For Kafka version 0.9.0.0 or later, the offset is stored with Kafka.
You can configure the Kafka Consumer origin to read all unread data in a topic. By default, the Kafka Consumer origin reads only incoming data.
You can use simple or bulk edit mode to add configuration properties.
For more information about adding custom Kafka configuration properties, see Additional Kafka Properties.
You can add custom Kafka configuration properties to the Kafka Consumer.
When you add the Kafka configuration property, enter the exact property name and the value. The Kafka Consumer does not validate the property names or values.
The Kafka Consumer origin creates record header attributes that include information about the originating file for the record. When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.
You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.
When using Kafka version 0.9.0.0 or later, you can configure the Kafka Consumer origin to connect securely through SSL/TLS, Kerberos, or both.
Earlier versions of Kafka do not support security.
Perform the following steps to enable the Kafka Consumer origin to use SSL/TLS to connect to Kafka version 0.9.0.0 or later. You can use the same steps to configure a Kafka Producer.
For details about these properties, see the Kafka documentation.
For example, the following properties allow the stage to use SSL/TLS to connect to Kafka 0.9.0.0 with client authentication:
When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to Kafka version 0.9.0.0 or later.
Perform the following steps to enable the Kafka Consumer origin to use Kerberos to connect to Kafka:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="kafka-client-1/sdc-01.streamsets.net@EXAMPLE.COM"; };
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="kafka-client-1/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
For example, the following Kafka properties enable connecting to Kafka 0.9.0.0 with Kerberos:
You can enable the Kafka Consumer origin to use SSL/TLS and Kerberos to connect to Kafka version 0.9.0.0 or later.
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="<keytab path>" principal="<principal name>/<host name>@<realm>"; };
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="kafka-client-1/sdc-01.streamsets.net@EXAMPLE.COM"; };
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="<principal name>/_HOST@<realm>"; };
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="_KEYTAB_PATH" principal="kafka-client-1/_HOST@EXAMPLE.COM"; };
Cloudera Manager generates the appropriate keytab path and host name.
For details about these properties, see the Kafka documentation.
The Kafka Consumer origin processes data differently based on the data format. Kafka Consumer can process the following types of data:
You must specify the method that the origin uses to deserialize the message. If the Avro schema ID is embedded in each message, set the key and value deserializers to Confluent on the Kafka tab.
Configure a Kafka Consumer to read data from a Kafka cluster.