Kafka Multitopic Consumer
Supported pipeline types:
|
When you configure a Kafka Multitopic Consumer, you configure the consumer group name and the brokers to use. You specify the topics to process and the number of threads to use. In Kafka, make sure that the partition assignment strategy is configured appropriately.
You can configure the origin to produce a single record when a message includes multiple objects. You can add additional Kafka configuration properties as needed. You can configure the origin to use Kafka security features. You can also configure the origin to capture Kafka message keys and store them in the record.
When processing Avro data, you can configure the Kafka Multitopic Consumer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
Kafka Multitopic Consumer includes record header attributes that enable you to use information about the record in pipeline processing.
Offset Management
The first time that a Kafka Multitopic Consumer origin identified by a consumer group receives messages from a topic, an offset entry is created for that consumer group and topic. The offset entry is created in Kafka.
- No stored offset
- When the consumer group and topic combination does not have a previously stored offset, the Kafka Multitopic Consumer origin uses the Auto Offset Reset property to determine the first message to read. You can set the origin to read messages in the topic starting from the earliest message, latest message, or a particular timestamp. The default setting is the earliest message, which results in the origin reading all existing messages in the topic.
- Previously stored offset
- When the consumer group and topic combination has a previously stored offset, the Kafka Multitopic Consumer origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.
Multithreaded Processing
The Kafka Multitopic Consumer origin performs parallel processing and enables the creation of a multithreaded pipeline. The Kafka Multitopic Consumer origin uses multiple concurrent threads based on the Number of Threads property and the partition assignment strategy defined in the Kafka cluster.
When performing multithreaded processing, the Kafka Multitopic Consumer origin checks the list of topics to process and creates the specified number of threads. Each thread connects to Kafka and creates a batch of data from a partition assigned by the broker based on the Kafka partition assignment strategy. Then, it passes the batch to an available pipeline runner.
A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, say you set the Number of Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. The threads are assigned to different partitions based on the Kafka partition assignment strategy. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview. For more information about the Kafka partition assignment strategies, see the Kafka documentation.
Additional Kafka Properties
You can add custom Kafka configuration properties to the Kafka Multitopic Consumer.
When you add a custom Kafka configuration property, enter the exact property name and the value. The Kafka Multitopic Consumer does not validate the property names or values.
If custom configurations conflict with other stage properties, the stage generates an error unless you select the Override Stage Configurations check box. With the check box selected, the custom configurations override other stage properties. For example, to use a SASL mechanism other than PLAIN or GSSAPI (Kerberos), which the stage provides, add the necessary properties as custom configuration properties and select the Override Stage Configurations check box. For information about the necessary properties, see the Kafka documentation.
- auto.commit.interval.ms
- bootstrap.servers
- enable.auto.commit
- group.id
- max.poll.records
Record Header Attributes
The Kafka Multitopic Consumer origin creates record header
attributes that include information about the originating file for
the record. When the origin processes Avro data, it includes the Avro schema in
an avroSchema
record header attribute. When a Kafka message contains Kafka headers, the origin can include those headers in
the record as record header attributes.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
- avroSchema - When processing Avro data, provides the Avro schema.
- offset - The offset where the record originated.
- partition - The partition where the record originated.
- topic - The topic where the record originated.
When Data Collector uses a Kafka Java client version 0.11 or later, the origin also creates record header attributes for each message header associated with a Kafka message. The generated record header attributes have names that match the original Kafka message headers.
Kafka Security
You can configure the Kafka Multitopic Consumer origin to connect securely to Kafka through SSL/TLS, SASL, or both. For more information about the methods and details on how to configure each method, see Security in Kafka Stages.
Data Formats
The Kafka Multitopic Consumer origin processes data differently based on the data format. Kafka Multitopic Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precision
andscale
field attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring a Kafka Multitopic Consumer Origin
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Stage Library Library version that you want to use. On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
-
On the Connection tab, configure the following
properties:
Kafka Property Description Broker URI Comma-separated list of connection strings for the Kafka brokers. Use the following format for each broker: <host>:<port>
.To ensure a pipeline can connect to Kafka in case a specified broker goes down, list as many brokers as possible.
Consumer Group Kafka consumer group that the Data Collector belongs to. Topic Subscription Type Method used to specify Kafka topics: - Topics List - Specify a list of topics to read.
- Pattern - Specify a regular expression that defines the topic names to read.
Topic List List of Kafka topics to read. Click Add to add additional topics. Tip: You can use Raw Preview to generate a list of topics associated with a broker. The list displays in a format that you can use to list the topics in bulk edit mode.Available when Topic Subscription Type is set to Topics List.
Topic Pattern Regular expression that defines the Kafka topics to read. Available when Topic Subscription Type is set to Pattern.
Produce Single Record For each partition, generates a single record for records that include multiple objects. When not selected, the origin generates multiple records when a record includes multiple objects.
Number of Threads Number of threads the origin generates and uses for multithreaded processing. The origin uses threads as defined by the Kafka partition assignment strategy. For more information, see Multithreaded Processing. Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size. Default is 1000. The Data Collector default is 1000.
Max Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch. Auto Offset Reset Method to determine first message to read when no offset exists for the combination of consumer group and topic:
- Earliest - Reads messages starting with the first messages in the topic.
- Latest - Reads messages starting with the last message in the topic.
- None - Generates an error if no offset exists.
- Timestamp - Reads messages starting with the timestamp specified in the Auto
Offset Reset Timestamp property.
You must be using Kafka version 0.10.1.0 or later to read messages based on timestamp.
Default is Earliest.
Auto Offset Reset Timestamp (ms) Timestamp of earliest message read when no offset exists for the origin. Specify in milliseconds since epoch (January 1, 1970). Available when using the Timestamp method to determine the first message to read. Configuration Properties Additional Kafka configuration properties to use. Using simple or bulk edit mode, click the Add icon to add properties. Define the Kafka property name and value.
Use the property names and values as expected by Kafka.
Include Timestamps Includes Kafka timestamps in the record header. The origin retrieves the timestamp from Kafka. If the Kafka message does not have timestamps, the origin leaves the timestamp attribute in the record header empty. When the pipeline writes to multiple Kafka clusters, enable this property to keep timestamps consistent across clusters. You must be using Kafka version 0.10 or later to include Kafka timestamps in the record header.
Key Capture Mode Mode that determines how the origin captures Kafka message keys:- None - Discards Kafka message keys.
- Record Header - Stores Kafka message keys in a specified record header
attribute. Use to pass message key values to Kafka as message keys.
You can use
record:attribute
functions to use string message key values in pipeline processing. Because Avro message key values are binary, you cannot use those values in the pipeline. - Record Field - Stores Kafka message keys in a specified field. Use to easily use message key values in the pipeline.
- Record Header and Field - The origin stores Kafka message keys in a specified header and field.
For more information about Kafka message key handling, see Kafka Message Keys. For information about working with string and Avro message key values, see Message Key Formats.
Key Capture Header Attribute Name of the header attribute to store Kafka message keys. Available when storing message keys in a record header attribute.
Key Capture Field Name of the field to store Kafka message keys. Available when storing message keys in a field in the record.
Metadata Refresh Time Number of milliseconds to wait before checking for additional topics that match the regular expression. Default is 180,000.
Available when Topic Subscription Type is set to Pattern.
Error Record Policy Determines the version of the record to use as a basis for an error record. For more information, see Error Records and Version. Override Stage Configurations When configurations conflict, the properties configured in the Configuration Properties property override other properties configured in the stage. -
On the Security tab, configure the security properties
to enable the stage to securely connect to Kafka.
For information about the security options and additional steps required to enable security, see Security in Kafka Stages.
-
On the Data Format tab, configure the following
property:
Data Format Property Description Data Format Type of data to be read. Use one of the following options: - Avro
- Binary
- Datagram
- Delimited
- JSON
- Log
- Protobuf
- SDC Record
- Text
- XML
-
For Avro data, on the Data Format tab, configure the
following properties:
Avro Property Description Avro Schema Location Location of the Avro schema definition to use when processing data: - Message/Data Includes Schema - Use the schema in the message.
- In Pipeline Configuration - Use the schema provided in the stage configuration.
- Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.
Using a schema in the stage configuration or in Confluent Schema Registry can improve performance.
Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data. You can optionally use the
runtime:loadResource
function to load a schema definition stored in a runtime resource file.Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add and then enter the URL in the following format: http://<host name>:<port number>
Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication. Enter the key and secret from the
schema.registry.basic.auth.user.info
setting in Schema Registry using the following format:<key>:<secret>
Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Lookup Schema By Method used to look up the schema in Confluent Schema Registry: - Subject - Look up the specified Avro schema subject.
- Schema ID - Look up the specified Avro schema ID.
- Embedded Schema ID - Look up the Avro schema ID embedded in each message.
Schema Subject Avro schema subject to look up in Confluent Schema Registry. If the specified subject has multiple schema versions, the stage uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.
Schema ID Avro schema ID to look up in Confluent Schema Registry. -
For binary data, on the Data Format tab and configure the
following property:
Binary Property Description Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error. -
For datagram data, on the Data Format tab, configure the
following properties:
Datagram Properties Description Datagram Packet Format Packet format of the data: - collectd
- NetFlow
- syslog
- Raw/separated data
TypesDB File Path Path to a user-provided types.db file. Overrides the default types.db file. For collectd data only.
Auth File Path to an optional authentication file. Use an authentication file to accept signed and encrypted data. For collectd data only.
Convert Hi-Res Time & Interval Converts the collectd high resolution time format interval and timestamp to UNIX time, in milliseconds. For collectd data only.
Exclude Interval Excludes the interval field from output record. For collectd data only.
Record Generation Mode Determines the type of values to include in the record. Select one of the following options: - Raw Only
- Interpreted Only
- Both Raw and Interpreted
For NetFlow 9 data only.
Max Templates in Cache The maximum number of templates to store in the template cache. For more information about templates, see Caching NetFlow 9 Templates. Default is -1 for an unlimited cache size.
For NetFlow 9 data only.
Template Cache Timeout (ms) The maximum number of milliseconds to cache an idle template. Templates unused for more than the specified time are evicted from the cache. For more information about templates, see Caching NetFlow 9 Templates. Default is -1 for caching templates indefinitely.
For NetFlow 9 data only.
Charset Character encoding of the messages to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For delimited data, on the Data Format tab, configure the
following properties:
Delimited Property Description Delimiter Format Type Delimiter format type. Use one of the following options: - Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
Available when using the Apache Commons parser type.
Header Line Indicates whether a file contains a header line, and whether to use the header line. Delimiter Character Delimiter character. Select one of the available options or use Other to enter a custom character. You can enter a Unicode control character using the format \uNNNN, where N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.
Default is the pipe character ( | ).
Available when using the Apache Commons parser with a custom delimiter format.
Multi Character Field Delimiter Characters that delimit fields. Default is two pipe characters (||).
Available when using the Apache Commons parser with the multi-character delimiter format.
Multi Character Line Delimiter Characters that delimit lines or records. Default is the newline character (\n).
Available when using the Apache Commons parser with the multi-character delimiter format.
Escape Character Escape character. Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.
Quote Character Quote character. Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.
Enable Comments Allows commented data to be ignored for custom delimiter format. Available when using the Apache Commons parser.
Comment Marker Character that marks a comment when comments are enabled for custom delimiter format. Available when using the Apache Commons parser.
Lines to Skip Number of lines to skip before reading data. Allow Extra Columns Allows processing records with more columns than exist in the header line. Available when using the Apache Commons parser to process data with a header line.
Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>
.For example,
_extra_1
. Default is_extra_
.Available when using the Apache Commons parser to process data with a header line while allowing extra columns.
Max Record Length (chars) Maximum length of a record in characters. Longer records are not read. This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Available when using the Apache Commons parser.
Ignore Empty Lines Allows empty lines to be ignored. Available when using the Apache Commons parser with the custom delimiter format.
Root Field Type Root field type to use: - List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
- List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
Parse NULLs Replaces the specified string constant with null values. NULL Constant String constant to replace with null values. Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For JSON data, on the Data Format tab, configure the
following properties:
JSON Property Description JSON Content Type of JSON content. Use one of the following options: - JSON array of objects
- Multiple JSON objects
Max Object Length (chars) Maximum number of characters in a JSON object. Longer objects are diverted to the pipeline for error handling.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For log data, on the Data Format tab, configure the
following properties:
Log Property Description Log Format Format of the log files. Use one of the following options: - Common Log Format
- Combined Log Format
- Apache Error Log Format
- Apache Access Log Custom Format
- Regular Expression
- Grok Pattern
- Log4j
- Common Event Format (CEF)
- Log Event Extended Format (LEEF)
Max Line Length Maximum length of a log line. The origin truncates longer lines. This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Retain Original Line Determines how to treat the original log line. Select to include the original log line as a field in the resulting record. By default, the original line is discarded.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. - When you select Apache Access Log Custom Format, use Apache log format strings to define the Custom Log Format.
- When you select Regular Expression, enter the regular expression that describes the log format, and then map the fields that you want to include to each regular expression group.
- When you select Grok Pattern, you can use the
Grok Pattern Definition field to define
custom grok patterns. You can define a pattern on each line.
In the Grok Pattern field, enter the pattern to use to parse the log. You can use a predefined grok patterns or create a custom grok pattern using patterns defined in Grok Pattern Definition.
For more information about defining grok patterns and supported grok patterns, see Defining Grok Patterns.
- When you select Log4j, define the following properties:
Log4j Property Description On Parse Error Determines how to handle information that cannot be parsed: - Skip and Log Error - Skips reading the line and logs a stage error.
- Skip, No Error - Skips reading the line and does not log an error.
- Include as Stack Trace - Includes information that cannot be parsed as a stack trace to the previously-read log line. The information is added to the message field for the last valid log line.
Use Custom Log Format Allows you to define a custom log format. Custom Log4J Format Use log4j variables to define a custom log format.
-
For protobuf data, on the Data Format tab, configure the
following properties:
Protobuf Property Description Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES
.For information about generating the descriptor file, see Protobuf Data Format Prerequisites. For more information about environment variables, see Data Collector Environment Configuration.
Message Type The fully-qualified name for the message type to use when reading data. Use the following format:
Use a message type defined in the descriptor file.<package name>.<message type>
.Delimited Messages Indicates if a message might include more than one protobuf message. -
For text data, on the Data Format tab, configure the
following properties:
Text Property Description Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated. Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Use Custom Delimiter Uses custom delimiters to define records instead of line breaks. Custom Delimiter One or more characters to use to define records. Include Custom Delimiter Includes delimiter characters in the record. Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For XML data, on the Data Format tab, configure the
following properties:
XML Property Description Delimiter Element Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:- An XML element directly under the root element.
Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.
- A simplified XPath expression that specifies the
data to use.
Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.
For more information about valid syntax, see Simplified XPath Syntax.
Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Preserve Root Element Includes the root element in the generated records. When omitting a delimiter to generate a single record, the root element is the root element of the XML document.
When specifying a delimiter to generate multiple records, the root element is the XML element specified as the delimiter element or is the last XML element in the simplified XPath expression specified as the delimiter element.
Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute. When not selected, this information is not included in the record. By default, the property is not selected.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces. For information about using namespaces with an XML element, see Using XML Elements with Namespaces.
For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.
Using simple or bulk edit mode, click the Add icon to add additional namespaces.
Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields. Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.By default, the property is not selected.
Max Record Length (chars) The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. - An XML element directly under the root element.