Kafka Producer
The Kafka Producer destination writes data to a Kafka cluster. The destination can also send responses to a microservice origin when used in a microservice pipeline. For information about supported versions, see Supported Systems and Versions.
When you configure a Kafka Producer, you define connection information, the partition strategy, and data format to use. You can also configure Kafka Producer to determine the topic to write to at runtime.
The Kafka Producer passes data to partitions in the Kafka topic based on the partition strategy that you choose. You can optionally write a batch of records to the Kafka cluster as a single message. When you want the destination to send responses to a microservice origin within a microservice pipeline, you specify the type of response to send. The destination writes all user-defined record header attributes to Kafka as Kafka message headers.
You can add additional Kafka configuration properties as needed. You can configure the destination to use Kafka security features. You can also configure the destination to pass message key values stored in the record to Kafka as Kafka message keys.
You can configure the Kafka Producer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
You can also use a connection to configure the destination.
Broker List
The Kafka Producer connects to Kafka based on the topic and associated brokers that you specify. To ensure a connection in case a specified broker goes down, list as many brokers as possible.
Runtime Topic Resolution
Kafka Producer can write a record to the topic based on an expression. When Kafka Producer evaluates a record, it calculates the expression based on record values and writes the record to the resulting topic.
When performing runtime topic resolution, Kafka Producer can write to any topic by default. You can create a white list of topics to limit the number of topics Kafka Producer attempts to use. When you create a white list, any record that resolves to an unlisted topic is sent to the stage for error handling. Use a white list when record data might resolve to invalid topic names.
Partition Strategy
The partition strategy determines how to write data to Kafka partitions. You can use a partition strategy to balance the work load or to write data semantically.
- Round-Robin
- Writes each record to a different partition using a cyclical order. Use for load balancing.
- Random
- Writes each record to a different partition using a random order. Use for load balancing.
- Expression
- Writes each record to a partition based on the results of the partition expression. Use for semantic partitioning.
- Default
- Writes each record using the default partition strategy that Kafka provides.
Send Microservice Responses
The Kafka Producer destination can send responses to a microservice origin when you use the destination in a microservice pipeline.
- All successfully written records.
- Responses from the destination system - For information about the possible responses, see the documentation for the destination system.
Additional Kafka Properties
You can add custom Kafka configuration properties to the Kafka Producer destination.
When you add a Kafka configuration property, enter the exact property name and the value. The stage does not validate the property names or values.
If custom configurations conflict with other stage properties, the stage generates an error unless you select the Override Stage Configurations check box. With the check box selected, the custom configurations override other stage properties. For example, to use a SASL mechanism other than PLAIN or GSSAPI (Kerberos), which the stage provides, add the necessary properties as custom configuration properties and select the Override Stage Configurations check box. For information about the necessary properties, see the Kafka documentation.
- key.serializer.class
- metadata.broker.list
- partitioner.class
- producer.type
- serializer.class
Writing Kafka Message Headers
When Data Collector uses a Kafka Java client version 0.11 or later, the Kafka Producer destination includes all user-defined record header attributes as Kafka message headers when writing messages to Kafka. User-defined record header attributes are those that you deliberately add to records as part of the pipeline logic.
The destination does not include internal attributes or automatically generated record header attributes in Kafka message headers.
${record:attribute('<existing attribute name>')}
.Example
Say a pipeline includes an SFTP/FTP/FTPS Client origin that generates several record
header attributes for information about the originating file for the record. The
attributes include file
for the path and name of the file, and
URI
for the URL used to access the remote server.
You want the Kafka Producer destination to include the URI
attribute
as a Kafka message header when writing to Kafka. You also want to include the
timestamp of the approximate time of processing.
- To include the processing time in an attribute, you specify the following
properties:
- Header Attribute property:
processingTime
- Header Attribute Expression property:
${time:now()}
The
time:now
function returns the current time from the Data Collector machine. - Header Attribute property:
- To include the URL used to access the remote server, you specify the
following properties:
- Header Attribute property:
originatingURL
- Header Attribute Expression property:
${record:attribute('URI')}
The expression evaluates to the value of the
URI
record header attribute generated by the SFTP/FTP/FTPS Client origin. - Header Attribute property:
When the Kafka Producer destination writes messages to Kafka, it includes these
user-defined record header attributes as message headers for each message. It does
not include the file
attribute or any other record header
attributes that are automatically generated by the stages or the pipeline.
Kafka Security
You can configure the Kafka Producer destination to connect securely to Kafka through SSL/TLS, SASL, or both. For more information about the methods and details on how to configure each method, see Security in Kafka Stages.
Data Formats
The Kafka Producer destination writes data to Kafka based on the data format that you select.
- Avro
- The destination writes records based on the Avro schema.
- Binary
- The stage writes binary data to a single field in the record.
- Delimited
- The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
- JSON
- The destination writes records as JSON data. You can use one of
the following formats:
- Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
- Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
- Protobuf
- Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
- SDC Record
- The destination writes records in the SDC Record data format.
- Text
- The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
- XML
- The destination creates a valid XML document for each record. The
destination requires the record to have a single root field that
contains the rest of the record data. For details and
suggestions for how to accomplish this, see Record Structure Requirement.
The destination can include indentation to produce human-readable documents. It can also validate that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination.
Configuring a Kafka Producer Destination
Configure a Kafka Producer destination to write data to a Kafka cluster.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Stage Library Library version that you want to use. Required Fields Fields that must include data for the record to be passed into the stage. Tip: You might include fields that the stage uses.Records that do not include all required fields are processed based on the error handling configured for the pipeline.
Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions. Records that do not meet all preconditions are processed based on the error handling configured for the stage.
On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the Kafka tab, configure the following
properties:
Kafka Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Broker URI Comma-separated list of connection strings for the Kafka brokers. Use the following format for each broker: <host>:<port>
.To ensure a pipeline can connect to Kafka in case a specified broker goes down, list as many brokers as possible.
Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record. Topic Topic to use. Not available when using runtime topic resolution.
Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name. Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling. Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.
Kafka Configuration Additional Kafka properties to use. Using simple or bulk edit mode, click the Add icon and define the Kafka property name and value. Use the property names and values as expected by Kafka. Do not use the broker.list property.
Partition Strategy Strategy to use to write to partitions: - Round Robin - Takes turns writing to different partitions.
- Random - Writes to partitions randomly.
- Expression - Uses an expression to write data to
different partitions. Writes records to the
partitions specified by the results of the expression.Note: The expression results are written to a specified Kafka message key attribute, overwriting any existing values. Because this partition strategy uses Kafka message keys, you cannot use the Kafka Message Key property in the destination to pass other Kafka message keys to Kafka. If records already contain Kafka message keys that you want to pass to Kafka, use a different partition strategy.
- Default - Uses an expression to extract a partition key from the record. Writes records to partitions based on a hash of the partition key.
Partition Expression Expression to use with the default or expression partition strategy. When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value.
When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value.
Optionally, click Ctrl + Space Bar for help with creating the expression.
One Message per Batch For each batch, writes the records to each partition as a single message. Override Stage Configurations When configurations conflict, the properties configured in the Kafka Configuration property override other properties configured in the stage. Kafka Message Key Passes message key values stored in a record header attribute to Kafka as message keys. Enter an expression that specifies the attribute where the message keys are stored.
To pass string message keys stored in an attribute, use:${record:attribute('<message key attribute name>'}
To pass Avro message keys stored in an attribute, use:${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record: attribute('<messsage key attribute name')))}
For more information, about working with Kafka message keys, see Kafka Message Keys.
Key Serializer Method used to serialize the Kafka message key when the configured data format is Avro. Set to Confluent to embed the Avro schema ID in each message that Kafka Producer writes.
-
On the Security tab, configure the security properties
to enable the stage to securely connect to Kafka.
For information about the security options and additional steps required to enable security, see Security in Kafka Stages.
-
On the Data Format tab, configure the following
property:
Data Format Property Description Data Format Data format for messages. Use one of the following formats: - Avro
- Binary
- Delimited
- JSON
- Protobuf
- SDC Record
- Text
- XML
Message Key Format Data format of the message key values to pass to Kafka. Ignore this property when not passing message key values to Kafka. For more information about working with Kafka message key values, see Kafka Message Keys.
-
For Avro data, on the Data Format tab, configure the
following properties:
Avro Property Description Avro Schema Location Location of the Avro schema definition to use when writing data: - In Pipeline Configuration - Use the schema that you provide in the stage configuration.
- In Record Header - Use the schema in the avroSchema record header attribute. Use only when the avroSchema attribute is defined for all records.
- Confluent Schema Registry - Retrieve the schema from the Confluent Schema Registry.
Avro Schema Avro schema definition used to write the data. You can optionally use the
runtime:loadResource
function to load a schema definition stored in a runtime resource file.Register Schema Registers a new Avro schema with the Confluent Schema Registry. Schema Registry URLs Confluent Schema Registry URLs used to look up the schema or to register a new schema. To add a URL, click Add and then enter the URL in the following format: http://<host name>:<port number>
Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication. Enter the key and secret from the
schema.registry.basic.auth.user.info
setting in Schema Registry using the following format:<key>:<secret>
Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Look Up Schema By Method used to look up the schema in Confluent Schema Registry: - Subject - Look up the specified Avro schema subject.
- Schema ID - Look up the specified Avro schema ID.
Schema Subject Avro schema subject to look up or to register in Confluent Schema Registry. If the specified subject to look up has multiple schema versions, the destination uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.
Schema ID Avro schema ID to look up in Confluent Schema Registry. Include Schema Includes the schema in each message. Note: If you configured Kafka Producer to embed the Avro schema ID in each message that it writes, clear this property.Avro Compression Codec The Avro compression type to use. When using Avro compression, do not enable other compression available in the destination.
-
For binary data, on the Data Format tab, configure the
following property:
Binary Property Description Binary Field Path Field that contains the binary data. -
For delimited data, on the Data Format tab, configure the
following properties:
Delimited Property Description Delimiter Format Format for delimited data: - Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
Header Line Indicates whether to create a header line. Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character. You can enter a Unicode control character using the format \uNNNN, where N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.
Default is the pipe character ( | ).
Record Separator String Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records. Available when using a custom delimiter format.
Escape Character Escape character for a custom delimiter format. Select one of the available options or use Other to enter a custom character. Default is the backslash character ( \ ).
Quote Character Quote character for a custom delimiter format. Select one of the available options or use Other to enter a custom character. Default is the quotation mark character ( " ).
Replace New Line Characters Replaces new line characters with the configured string. Recommended when writing data as a single line of text.
New Line Character Replacement String to replace each new line character. For example, enter a space to replace each new line character with a space. Leave empty to remove the new line characters.
Charset Character set to use when writing data. -
For JSON data, on the Data Format tab, configure the
following properties:
JSON Property Description JSON Content Method to write JSON data: - JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
- Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
Charset Character set to use when writing data. -
For protobuf data, on the Data Format tab, configure the
following properties:
Protobuf Property Description Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES
.For more information about environment variables, see Java and Security Configuration. For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
Message Type Fully-qualified name for the message type to use when writing data. Use the following format:
Use a message type defined in the descriptor file.<package name>.<message type>
. -
For text data, on the Data Format tab, configure the
following properties:
Text Property Description Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field. Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records. By default, the destination uses \n.
On Missing Field When a record does not include the text field, determines whether the destination reports the missing field as an error or ignores the missing field. Insert Record Separator if No Text When configured to ignore a missing text field, inserts the configured record separator string to create an empty line. When not selected, discards records without the text field.
Charset Character set to use when writing data. -
For XML data, on the Data Format tab, configure the
following properties:
XML Property Description Pretty Format Adds indentation to make the resulting XML document easier to read. Increases the record size accordingly. Validate Schema Validates that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination. Important: Regardless of whether you validate the XML schema, the destination requires the record in a specific format. For more information, see Record Structure Requirement.XML Schema The XML schema to use to validate records. -
When using the destination in a microservice pipeline, on the
Response tab, configure the following properties. In
non-microservice pipelines, these properties are ignored.
Response Property Description Send Response to Origin Enables sending a response to a microservice origin. Response Type The response to send to a microservice origin: - Successfully written records.
- Responses from the destination system.