MQTT Subscriber
Supported pipeline types:
|
When you configure the origin, you specify the information needed to connect to the MQTT broker. You must define connection credentials when the MQTT broker requires a user name and password.
You can also configure SSL/TLS properties, including default transport protocols and cipher suites.
The stage supports high availability MQTT clusters. For a cluster without a load balancer, you configure a list of brokers in the cluster. After losing a connection to a broker, the stage connects to the next available broker in the list.
You specify one or more topics on the MQTT broker that the origin subscribes to. The origin includes the name of the originating topic for each record in a record header attribute.
You also configure the quality of service level and the persistence mechanism that the origin uses to enable reliable messaging.
Edge Pipeline Prerequisite
In Data Collector Edge pipelines, MQTT stages require using an intermediary MQTT broker.
For example, an edge sending pipeline uses an MQTT Publisher destination to write to an MQTT broker. The MQTT broker temporarily stores the data until the MQTT Subscriber origin in the Data Collector receiving pipeline reads the data.
Topics
The MQTT Subscriber origin reads messages from one or more topics on the MQTT broker. A topic is a string that the broker uses to filter messages for each connected client.
sales/US/NorthernRegion
You can use the StreamSets expression language to define the topic names. You can also use MQTT wildcards in the topic names, such as the single level wildcard (+) or the multi level wildcard (#).
For more information, see the HiveMQ documentation on MQTT topics.
Record Header Attributes
The MQTT Subscriber origin creates a record header attribute that includes information about the origins of the record.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
- TOPIC_HEADER_NAME - Includes the originating topic for each record.
Data Formats
The MQTT Subscriber origin processes data differently based on the data format that you select.
In Data Collector Edge pipelines, the origin supports only the Binary, Delimited, JSON, SDC Record, and Text data formats.
The MQTT Subscriber origin processes data formats as follows:
- Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring an MQTT Subscriber Origin
Configure an MQTT Subscriber to read messages from an MQTT broker.
In Data Collector Edge pipelines, the MQTT Subscriber origin requires an intermediary MQTT broker.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the MQTT tab, configure the following
properties:
MQTT Property Description Broker URL MQTT Broker URL. Enter in the following format: <tcp | ssl>://<hostname>:<port>
Use ssl for secure connections to the broker.
For example:tcp://localhost:1883
For high availability MQTT clusters without a load balancer, specify a list of brokers from the cluster, separated by commas and without spaces. The stage connects to the first available broker, trying in the order listed. For example:tcp://hostA:1883,tcp://hostB:1883,tcp://hostC:1883
Topic Filter Topics to subscribe to. Using simple or bulk edit mode, click the Add icon to read from additional topics. Use Credentials Enables entering credentials on the Credentials tab. Use when the MQTT broker requires a user name and password. Clean Session Enables connecting to the MQTT broker using a clean session, or a non-persistent connection. See the MQTT documentation for details about MQTT clean sessions. Client ID MQTT Client ID. The ID must be unique across all clients connecting to the same broker. You can define an expression that evaluates to the client ID. For example, enter the following expression to use the unique pipeline ID as the client ID:${pipeline:id()}
If a pipeline includes multiple MQTT stages and you want to use the unique pipeline ID as the client ID for both stages, prefix the client ID with a string like this:
Otherwise, all stages will use the same client ID. This can cause problems, such as messages disappearing.sub-${pipeline:id()} and pub-${pipeline:id()}
Quality of Service Determines the quality of service level used to guarantee message delivery: - At Most Once (0)
- At Least Once (1)
- Exactly Once (2)
For more information, see the HiveMQ documentation on quality of service levels.
Client Persistence Mechanism Determines the persistence mechanism that the origin uses to guarantee message delivery when the quality of service level is at least once or exactly once. Select one of the following options: - Memory - Store messages in memory on the Data Collector machine until the delivery of the message is complete.
- File - Store messages in a local file on the Data Collector machine until the delivery of the message is complete.
Not used when the quality of service level is at most once.
For more information, see the HiveMQ documentation on client persistence.
Client Persistence Data Directory Local directory on the Data Collector machine where the origin temporarily stores messages in a file when you configure file persistence. The user who starts Data Collector must have read and write access to this directory.
Keep Alive Interval (secs) Maximum time in seconds to allow the connection to the MQTT broker to remain idle. After the origin receives no messages for this amount of time, the connection is closed. The origin must reconnect to the MQTT broker. Default is 60 seconds.
-
On the Credentials tab, enter the MQTT credentials to use
if you enabled credentials.
Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
-
To use SSL/TLS, on the TLS tab, configure the following
properties:
In Data Collector Edge pipelines, the keystore/truststore type, password, and algorithm properties are ignored. In Data Collector Edge pipelines, the MQTT Subscriber origin always uses the default protocol and cipher suites.
TLS Property Description Use TLS Enables the use of TLS. Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore. Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key. In Data Collector Edge pipelines, enter the contents of the key.
Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate. Using simple or bulk edit mode, click the Add icon to add additional certificates.
In Data Collector Edge pipelines, enter the contents of the certificate.
Keystore File Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/keystore.jks
By default, no keystore is used.
In Data Collector Edge pipelines, enter an absolute path to the file that uses the PEM format.
Keystore Type Type of keystore to use. Use one of the following types: - Java Keystore File (JKS)
- PKCS #12 (p12 file)
Default is Java Keystore File (JKS).
Keystore Password Password to the keystore file. A password is optional, but recommended.
Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.Keystore Key Algorithm Algorithm to manage the keystore.
Default is SunX509.
Use Remote Truststore Enables loading the contents of the truststore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore. Trusted Certificates Each PEM certificate used in the remote truststore. Enter a credential function that returns the certificate or enter the contents of the certificate. Using simple or bulk edit mode, click the Add icon to add additional certificates.
In Data Collector Edge pipelines, enter the contents of the certificate.
Truststore File Path to the local truststore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/truststore.jks
By default, no truststore is used.
In Data Collector Edge pipelines, enter an absolute path to the file that uses the PEM format.
Truststore Type Type of truststore to use. Use one of the following types:- Java Keystore File (JKS)
- PKCS #12 (p12 file)
Default is Java Keystore File (JKS).
Truststore Password Password to the truststore file. A password is optional, but recommended.
Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.Truststore Trust Algorithm Algorithm to manage the truststore.
Default is SunX509.
Use Default Protocols Uses the default TLSv1.2 transport layer security (TLS) protocol. To use a different protocol, clear this option. Transport Protocols TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols. Note: Older protocols are not as secure as TLSv1.2.Use Default Cipher Suites Uses a default cipher suite for the SSL/TLS handshake. To use a different cipher suite, clear this option. Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites. Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.
-
On the Data Format tab, configure the following
property:
Data Format Property Description Data Format Type of data to be processed. Use one of the following data formats: - Binary
- Datagram
- Delimited
- JSON
- Log
- Protobuf
- SDC Record
- Text
- XML
In Data Collector Edge pipelines, the origin supports only the Binary, Delimited, JSON, SDC Record, and Text data formats.
-
For binary data, on the Data Format tab, configure the
following properties:
Binary Property Description Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error. -
For datagram data, on the Data Format tab, configure the
following properties:
Datagram Properties Description Datagram Packet Format Packet format of the data: - collectd
- NetFlow
- syslog
- Raw/separated data
TypesDB File Path Path to a user-provided types.db file. Overrides the default types.db file. For collectd data only.
Auth File Path to an optional authentication file. Use an authentication file to accept signed and encrypted data. For collectd data only.
Convert Hi-Res Time & Interval Converts the collectd high resolution time format interval and timestamp to UNIX time, in milliseconds. For collectd data only.
Exclude Interval Excludes the interval field from output record. For collectd data only.
Record Generation Mode Determines the type of values to include in the record. Select one of the following options: - Raw Only
- Interpreted Only
- Both Raw and Interpreted
For NetFlow 9 data only.
Max Templates in Cache The maximum number of templates to store in the template cache. For more information about templates, see Caching NetFlow 9 Templates. Default is -1 for an unlimited cache size.
For NetFlow 9 data only.
Template Cache Timeout (ms) The maximum number of milliseconds to cache an idle template. Templates unused for more than the specified time are evicted from the cache. For more information about templates, see Caching NetFlow 9 Templates. Default is -1 for caching templates indefinitely.
For NetFlow 9 data only.
Charset Character encoding of the messages to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For delimited data, on the Data Format tab, configure the
following properties:
Delimited Property Description Header Line Indicates whether a file contains a header line, and whether to use the header line. Delimiter Format Type Delimiter format type. Use one of the following options: - Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
Available when using the Apache Commons parser type.
Multi Character Field Delimiter Characters that delimit fields. Default is two pipe characters (||).
Available when using the Apache Commons parser with the multi-character delimiter format.
Multi Character Line Delimiter Characters that delimit lines or records. Default is the newline character (\n).
Available when using the Apache Commons parser with the multi-character delimiter format.
Delimiter Character Delimiter character. Select one of the available options or use Other to enter a custom character. You can enter a Unicode control character using the format \uNNNN, where N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.
Default is the pipe character ( | ).
Available when using the Apache Commons parser with a custom delimiter format.
Field Separator One or more characters to use as delimiter characters between columns. Available when using the Univocity parser.
Quote Character Quote character. Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.
Allow Comments Allows commented data to be ignored for custom delimiter format. Available when using the Univocity parser.
Comment Character Character that marks a comment when comments are enabled for custom delimiter format.
Available when using the Univocity parser.
Enable Comments Allows commented data to be ignored for custom delimiter format. Available when using the Apache Commons parser.
Comment Marker Character that marks a comment when comments are enabled for custom delimiter format. Available when using the Apache Commons parser.
Lines to Skip Number of lines to skip before reading data. Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
CSV Parser Parser to use to process delimited data: - Apache Commons - Provides robust parsing and a wide range of delimited format types.
- Univocity - Can provide faster processing for wide delimited files, such as those with over 200 columns.
Default is Apache Commons.
Max Columns Maximum number of columns to process per record. Available when using the Univocity parser.
Max Character per Column Maximum number of characters to process in each column. Available when using the Univocity parser.
Skip Empty Lines Allows skipping empty lines. Available when using the Univocity parser.
Allow Extra Columns Allows processing records with more columns than exist in the header line. Available when using the Apache Commons parser to process data with a header line.
Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>
.For example,
_extra_1
. Default is_extra_
.Available when using the Apache Commons parser to process data with a header line while allowing extra columns.
Max Record Length (chars) Maximum length of a record in characters. Longer records are not read. This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Available when using the Apache Commons parser.
Ignore Empty Lines Allows empty lines to be ignored. Available when using the Apache Commons parser with the custom delimiter format.
Root Field Type Root field type to use: - List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
- List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
Parse NULLs Replaces the specified string constant with null values. NULL Constant String constant to replace with null values. Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For JSON data, on the Data Format tab, configure the
following properties:
JSON Property Description JSON Content Type of JSON content. Use one of the following options: - Array of Objects
- Multiple Objects
Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Maximum Object Length (chars) Maximum number of characters in a JSON object. Longer objects are diverted to the pipeline for error handling.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For log data, on the Data Format tab, configure the
following properties:
Log Property Description Log Format Format of the log files. Use one of the following options: - Common Log Format
- Combined Log Format
- Apache Error Log Format
- Apache Access Log Custom Format
- Regular Expression
- Grok Pattern
- Log4j
- Common Event Format (CEF)
- Log Event Extended Format (LEEF)
Max Line Length Maximum length of a log line. The origin truncates longer lines. This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Retain Original Line Determines how to treat the original log line. Select to include the original log line as a field in the resulting record. By default, the original line is discarded.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. - When you select Apache Access Log Custom Format, use Apache log format strings to define the Custom Log Format.
- When you select Regular Expression, enter the regular expression that describes the log format, and then map the fields that you want to include to each regular expression group.
- When you select Grok Pattern, you can use the
Grok Pattern Definition field to define
custom grok patterns. You can define a pattern on each line.
In the Grok Pattern field, enter the pattern to use to parse the log. You can use a predefined grok patterns or create a custom grok pattern using patterns defined in Grok Pattern Definition.
For more information about defining grok patterns and supported grok patterns, see Defining Grok Patterns.
- When you select Log4j, define the following properties:
Log4j Property Description On Parse Error Determines how to handle information that cannot be parsed: - Skip and Log Error - Skips reading the line and logs a stage error.
- Skip, No Error - Skips reading the line and does not log an error.
- Include as Stack Trace - Includes information that cannot be parsed as a stack trace to the previously-read log line. The information is added to the message field for the last valid log line.
Use Custom Log Format Allows you to define a custom log format. Custom Log4J Format Use log4j variables to define a custom log format.
-
For protobuf data, on the Data Format tab, configure the
following properties:
Protobuf Property Description Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES
.For information about generating the descriptor file, see Protobuf Data Format Prerequisites. For more information about environment variables, see Data Collector Environment Configuration.
Message Type The fully-qualified name for the message type to use when reading data. Use the following format:
Use a message type defined in the descriptor file.<package name>.<message type>
.Delimited Messages Indicates if a message might include more than one protobuf message. Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
-
For SDC Record data, on the Data Format tab, configure the
following properties:
SDC Record Property Description Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
-
For text data, on the Data Format tab, configure the
following properties:
Text Property Description Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated. Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Use Custom Delimiter Uses custom delimiters to define records instead of line breaks. Custom Delimiter One or more characters to use to define records. Include Custom Delimiter Includes delimiter characters in the record. Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For XML data, on the Data Format tab, configure the
following properties:
XML Property Description Delimiter Element Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:- An XML element directly under the root element.
Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.
- A simplified XPath expression that specifies the
data to use.
Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.
For more information about valid syntax, see Simplified XPath Syntax.
Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files compressed by the supported compression formats.
- Archive - Processes files archived by the supported archive formats.
- Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Preserve Root Element Includes the root element in the generated records. When omitting a delimiter to generate a single record, the root element is the root element of the XML document.
When specifying a delimiter to generate multiple records, the root element is the XML element specified as the delimiter element or is the last XML element in the simplified XPath expression specified as the delimiter element.
Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute. When not selected, this information is not included in the record. By default, the property is not selected.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces. For information about using namespaces with an XML element, see Using XML Elements with Namespaces.
For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.
Using simple or bulk edit mode, click the Add icon to add additional namespaces.
Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields. Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.By default, the property is not selected.
Max Record Length (chars) The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. - An XML element directly under the root element.