Kafka
The Kafka destination writes data to a Kafka cluster. The destination supports Apache Kafka 0.10 and later. When using a Cloudera distribution of Apache Kafka, use CDH Kafka 3.0 or later.
The destination writes each record as a Kafka message to the specified topic. The Kafka cluster determines the number of partitions that the destination uses to write the data.
When you configure the Kafka destination, you specify the Kafka brokers that the destination connects to, the Kafka topic to write to, and the data format to use. You can configure the destination to connect securely to Kafka. You can also use a connection to configure the origin.
You can configure the destination to pass Kafka message keys to Kafka along with the data. You can also specify additional Kafka configuration properties.
Generated Messages and Kafka Message Keys
Each Kafka message contains two parts: an optional message key and a required value. By default, the destination generates a null value for the message key and writes the record data to the message value. However, when the destination processes data that is not delimited, you can configure the destination to process Kafka message keys.
Example: Default messages
order_id | customer_id | amount |
---|---|---|
1075623 | 2 | 34.56 |
1076645 | 47 | 234.67 |
1050945 | 342 | 126.05 |
Key | Value |
---|---|
null | {"order_id":1075623,"customer_id":2,amount":34.56} |
null | {"order_id":1076645,"customer_id":47,"amount":234.67} |
null | {"order_id":1050945,"customer_id":342,"amount":126.05} |
Example: Messages with message keys
key | order_id | customer_id | amount |
---|---|---|---|
123 | 1075623 | 2 | 34.56 |
124 | 1076645 | 47 | 234.67 |
125 | 1050945 | 342 | 126.05 |
key
field for Kafka message keys, and to use JSON as the data
format, the destination writes the following messages to Kafka:Key | Value |
---|---|
123 | {"order_id":1075623,"customer_id":2,amount":34.56} |
124 | {"order_id":1076645,"customer_id":47,"amount":234.67} |
125 | {"order_id":1050945,"customer_id":342,"amount":126.05} |
Note that the data in the key field is used as the message key and is not included in the message value.
Kafka Security
You can configure the destination to connect securely to Kafka through SSL/TLS, SASL, or both. For more information about the methods and details on how to configure each method, see Security in Kafka Stages.
Data Formats
The Kafka destination writes records based on the specified data format.
- Avro
- The destination writes records based on the Avro schema. Note: To use the Avro data format, Apache Spark version 2.4 or later must be installed on the Transformer machine and on each node in the cluster.You can use one of the following methods to specify the location of the Avro schema definition:
- In Pipeline Configuration - Use the schema defined in the stage properties. Optionally, you can configure the destination to register the specified schema with Confluent Schema Registry at a URL with a schema subject.
- Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. Confluent Schema Registry is a distributed storage layer for Avro schemas. You specify the URL to Confluent Schema Registry and whether to look up the schema by the schema ID or subject.
You can also compress data with an Avro-supported compression codec.
- Delimited
- The destination writes a delimited message for every record. You can specify a custom delimiter, quote, and escape character to use in the data.
- JSON
- The destination writes a JSON line message for every record. For more information, see the JSON Lines website.
- Text
- The destination writes a message with a single String field for every record. When you configure the destination, you select the field to use.
Configuring a Kafka Destination
Configure a Kafka destination to write data to a Kafka cluster.
-
On the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Stage Library Stage library to use to connect to Kafka: - Kafka cluster-provided libraries - The cluster where the pipeline runs has Kafka libraries installed, and therefore has all of the necessary libraries to run the pipeline.
- Kafka Transformer-provided libraries - Transformer passes the necessary libraries with the pipeline
to enable running the pipeline.
Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Kafka libraries.
Note: When using additional Kafka stages in the pipeline, ensure that they use the same stage library. -
On the Kafka tab, configure the following
properties:
Kafka Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
Broker URIs List of comma-separated pairs of hosts and ports used to establish the initial connection to the Kafka cluster. Use the following format: <host1>:<port1>,<host2>:<port2>,…
Once a connection is established, the stage discovers the full set of available brokers.
Topic Kafka topic to write to. If the topic doesn't exist, the destination creates the topic using the default configurations defined in the Kafka cluster.
Send Message Keys Enables the destination to send data in the specified field to Kafka to be used as message keys. Can be used with all data formats except Delimited.
To process Kafka message keys with the JSON data format, Apache Spark version 2.4 or later must be installed on the Transformer machine and on each node in the cluster.
Key Field Name of the field to pass to Kafka as message keys. The default is
keys
, which is the field where the Kafka origin stores message key data when configured to include message keys in records.Additional Configurations Additional Kafka configuration properties to pass to Kafka. To add properties, click the Add icon and define the Kafka property name and value. Usekafka.
as a prefix for the property names, as follows:kafka.<kafka property name>
-
On the Security tab, configure the security properties
to enable the destination to securely connect to Kafka.
For information about the security options and additional steps required to enable security, see Security in Kafka Stages.
-
On the Data Format tab, configure the following
properties:
Data Format Property Description Data Format Format of the data to write to messages. Select one of the following formats: - Avro
- Delimited
- JSON
- Text
-
For Avro data, click the Schema tab and configure the
following properties:
Schema Property Description Avro Schema Location Location of the Avro schema definition to use to process data: - In Pipeline Configuration - Use the schema specified in the Avro Schema property.
- Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.
Avro Schema Avro schema definition used to write the data. You can optionally use the
runtime:loadResource
function to use a schema definition stored in a runtime resource file.Available when Avro Schema Location is set to In Pipeline Configuration.
Register Schema Registers the specified Avro schema with Confluent Schema Registry. Available when Avro Schema Location is set to In Pipeline Configuration.
Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add. Use the following format to enter the URL: http://<host name>:<port number>
Available when Avro Schema Location is set to In Pipeline Configuration.
Basic Auth User Info Confluent Schema Registry basic.auth.user.info
credential.Available when Avro Schema Location is set to Confluent Schema Registry.
Lookup Schema By Method used to look up the schema in Confluent Schema Registry: - Subject - Look up the specified Avro schema subject.
- Schema ID - Look up the specified Avro schema ID.
Available when Avro Schema Location is set to In Pipeline Configuration.
Schema Subject Avro schema subject to look up or to register in Confluent Schema Registry. If the specified subject to look up has multiple schema versions, the destination uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.
Available when Avro Schema Location is set to Confluent Schema Registry.
Schema ID Avro schema ID to look up in the Confluent Schema Registry. Available when Avro Schema Location is set to In Pipeline Configuration.
Avro Compression Codec Avro compression type to use. -
For delimited data, on the Data Format tab, configure the
following property:
Delimited Property Description Delimiter Character Delimiter character to use in the data. Select one of the available options or select Other to enter a custom character. You can enter a Unicode control character using the format \uNNNN, where N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.
Quote Character Quote character to use in the data. Escape Character Escape character to use in the data -
For text data, on the Data Format tab, configure the
following property:
Text Property Description Text Field String field in the record that contains the data to be written. All data must be incorporated into the specified field.