Kafka Message Keys
You can configure the Kafka Multitopic Consumer origin to capture the message keys included in each Kafka message and store them in generated records. Kafka message keys can be string values or Avro messages, depending on how your Kafka system is configured.
If you have no need for message keys, you can discard them. The Kafka Multitopic Consumer origin discards message keys by default.
Storing Message Keys
You can configure the Kafka Multitopic Consumer origin to capture Kafka message keys and store them in records. Store message key values when you want to use them in the pipeline, write them to destinations, or pass them to Kafka to use as message keys. By default, the origin discardsKafka message keys.
- In a specified record field
- Store message key values in a specified field when you want to perform processing based on field values. When you store message key values in a field, you can access the values easily with expression-based and field-based stages, such as the Stream Selector or Field Replacer processors.
- In a specified record header attribute
- Store message key values in a specified record header attribute when you want to use the Kafka Producer to pass message key values to Kafka as Kafka message keys.
- In a specified record field and record header attribute
- Store message key values in both a specified record field and a specified record header attribute when you have multiple uses for message key values.
Message Key Formats
Kafka message keys can be string values or Avro messages, depending on how your Kafka system is configured.
The format of the message keys determines how message key values are stored in the record, and how you work with those values.
String Message Keys
When Kafka message keys are string values, the string values are simply written to the specified record field or record header attribute.
- In record fields
- To access message key values stored in a field, use the
record:value
function, as follows:${record:value('/<message key field name>')}
- In record header attributes
- To access message key values stored in a record header attribute, use the
record:attribute
function, as follows:
Specify the name of the attribute where message key values are stored, as defined in the Kafka origin. By default, the Kafka origin uses the following attribute name:${record:attribute('<message key attribute name>'}
kafkaMessageKey
.
Avro Message Keys
- In record fields
- When you store Avro message key values in a field, the Kafka origin converts the binary Avro message key to a map and writes it to the specified field.
- In record header attributes
- When you store Avro message key values in a record header attribute, the
Kafka origin performs the following tasks:
- Stores the Avro schema in an
avroKeySchema
attribute. - Encodes the binary Avro message key using Base64, then stores it in the specified attribute.
- Stores the Avro schema in an
Passing Key Values to Kafka
You can configure the Kafka Producer destination to pass message key values stored in record header attributes to Kafka to be used as message keys. This enables Kafka to use existing message keys instead of creating new keys for the records.
- Kafka Message Key property on the Kafka tab - Define an expression that indicates the location of the message key values in the records.
- Message Key Format property on the Data Format tab - Select the format of the
message key values: String or Avro. Tip: When you select the Message Key Format property, the destination provides a default expression in the Kafka Message Key property related to the selected format that you can edit as needed.
- String
- For string message keys, use the following expression for the Kafka Message Key property:
- Avro
- For Avro message keys, use the following expression for the Kafka Message
Key
property:
${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record: attribute('<messsage key attribute name')))}
Example: Storing and Passing Kafka Message Keys
Say you want to use string Kafka message key values to route records to different processing streams. Then, after all pipeline processing is complete, you want to write records to a different Kafka topic and you want the resulting messages to use the original message keys.
- Set the Key Capture Mode property to Record Header and Field.
- Use the default value for the Key Capture Field property,
/kafkaMessageKey
. - Use the default value for the Key Capture Header Attribute property,
kafkaMessageKey
.
This enables the origin to capture and store the Kafka message keys in the specified field and attribute.
Then, add a Stream Selector processor to the pipeline to route data to different
processing streams based on the values in the /kafkaMessageKey
field
path.
- On the Data Format tab of the destination, select String for the Message Key Format property.
- On the Kafka tab, verify that the default expression for the Kafka Message Key
property provides message key values to Kafka:
${record:value('/kafkaMessageKey')}
.
This enables the Kafka Producer destination to pass message key values stored in the
kafkaMessageKey
record header attribute to Kafka to be used as
message keys.