MQTT Publisher

Supported pipeline types:
  • Data Collector

  • Data Collector Edge

The MQTT Publisher destination publishes messages to a topic on an MQTT broker. The destination functions as an MQTT client that publishes messages, writing each record as a message. For information about supported versions, see Supported Systems and Versions.

When you configure the destination, you specify the information needed to connect to the MQTT broker. You must define connection credentials when the MQTT broker requires a user name and password.

You can also configure SSL/TLS properties, including default transport protocols and cipher suites.

The stage supports high availability MQTT clusters. For a cluster without a load balancer, you configure a list of brokers in the cluster. After losing a connection to a broker, the stage connects to the next available broker in the list.

You specify the topic on the MQTT broker that the destination delivers the message to.

You also configure the quality of service level and the persistence mechanism that the destination uses to enable reliable messaging.

Edge Pipeline Prerequisite

In Data Collector Edge pipelines, MQTT stages require using an intermediary MQTT broker.

For example, an edge sending pipeline uses an MQTT Publisher destination to write to an MQTT broker. The MQTT broker temporarily stores the data until the MQTT Subscriber origin in the Data Collector receiving pipeline reads the data.

Topic

The MQTT Publisher destination writes messages to a single topic on the MQTT broker. Any MQTT client subscribed to that topic receives the messages. A topic is a string that the broker uses to filter messages for each connected client.

When you configure the destination, you define the topic name. You can include multiple topic levels in a topic. For example, the following topic has three topic levels:
sales/US/NorthernRegion

You cannot use MQTT wildcards in the topic name used by the MQTT Publisher destination.

For more information, see the HiveMQ documentation on MQTT topics.

Data Formats

The MQTT Publisher destination writes messages to an MQTT broker based on the data format that you select.

The MQTT Publisher destination processes data formats as follows:

Binary
The stage writes binary data to a single field in the record.
JSON
The destination writes records as JSON data. You can use one of the following formats:
  • Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
  • Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
SDC Record
The destination writes records in the SDC Record data format.
Text
The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
You can configure the characters to use as record separators. By default, the destination uses a UNIX-style line ending (\n) to separate records.
When a record does not contain the selected text field, the destination can report the missing field as an error or to ignore the missing field. By default, the destination reports an error.
When configured to ignore a missing text field, the destination can discard the record or write the record separator characters to create an empty line for the record. By default, the destination discards the record.

Configuring an MQTT Publisher Destination

Configure an MQTT Publisher destination to write messages to an MQTT broker.

In Data Collector Edge pipelines, the MQTT Publisher destination requires an intermediary MQTT broker.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the MQTT tab, configure the following properties:
    MQTT Property Description
    Broker URL MQTT Broker URL. Enter in the following format:
    <tcp | ssl>://<hostname>:<port>

    Use ssl for secure connections to the broker.

    For example:
    tcp://localhost:1883
    For high availability MQTT clusters without a load balancer, specify a list of brokers from the cluster, separated by commas and without spaces. The stage connects to the first available broker, trying in the order listed. For example:
    tcp://hostA:1883,tcp://hostB:1883,tcp://hostC:1883
    Topic Topic to publish to. Using simple or bulk edit mode, click the Add icon to read from additional topics.
    Use Credentials Enables entering credentials on the Credentials tab. Use when the MQTT broker requires a user name and password.
    Clean Session Enables connecting to the MQTT broker using a clean session, or a non-persistent connection. See the MQTT documentation for details about MQTT clean sessions.
    Client ID MQTT Client ID. The ID must be unique across all clients connecting to the same broker.
    You can define an expression that evaluates to the client ID. For example, enter the following expression to use the unique pipeline ID as the client ID:
    ${pipeline:id()}

    If a pipeline includes multiple MQTT stages and you want to use the unique pipeline ID as the client ID for both stages, prefix the client ID with a string like this:

    sub-${pipeline:id()} and pub-${pipeline:id()} 
    Otherwise, all stages will use the same client ID. This can cause problems, such as messages disappearing.
    Quality of Service Determines the quality of service level used to guarantee message delivery:
    • At Most Once (0)
    • At Least Once (1)
    • Exactly Once (2)

    For more information, see the HiveMQ documentation on quality of service levels.

    Client Persistence Mechanism Determines the persistence mechanism that the destination uses to guarantee message delivery when the quality of service level is at least once or exactly once. Select one of the following options:
    • Memory - Store messages in memory on the Data Collector machine until the delivery of the message is complete.
    • File - Store messages in a local file on the Data Collector machine until the delivery of the message is complete.

    Not used when the quality of service level is at most once.

    For more information, see the HiveMQ documentation on client persistence.

    Client Persistence Data Directory Local directory on the Data Collector machine where the destination temporarily stores messages in a file when you configure file persistence.

    The user who starts Data Collector must have read and write access to this directory.

    Keep Alive Interval (secs) Maximum time in seconds to allow the connection to the MQTT broker to remain idle. After the destination publishes no messages for this amount of time, the connection is closed. The destination must reconnect to the MQTT broker.

    Default is 60 seconds.

    Retain the Message Determines whether or not the MQTT broker retains the message last published by the destination when no MQTT client is subscribed to listen to the topic.

    When selected, the MQTT broker retains the last message published by the destination. Any messages published earlier are lost. When cleared, all messages published by the destination are lost.

    For more information about MQTT retained messages, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages.

  3. On the Credentials tab, enter the MQTT credentials to use if you enabled credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
  4. To use SSL/TLS, on the TLS tab, configure the following properties:

    In Data Collector Edge pipelines, the keystore/truststore type, password, and algorithm properties are ignored. In Data Collector Edge pipelines, the MQTT Publisher destination always uses the default protocol and cipher suites.

    TLS Property Description
    Use TLS Enables the use of TLS.
    Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key.

    In Data Collector Edge pipelines, enter the contents of the key.

    Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    In Data Collector Edge pipelines, enter the contents of the certificate.

    Keystore File

    Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    By default, no keystore is used.

    In Data Collector Edge pipelines, enter an absolute path to the file that uses the PEM format.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password Password to the keystore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Keystore Key Algorithm Algorithm to manage the keystore.

    Default is SunX509.

    Use Remote Truststore Enables loading the contents of the truststore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Trusted Certificates Each PEM certificate used in the remote truststore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    In Data Collector Edge pipelines, enter the contents of the certificate.

    Truststore File

    Path to the local truststore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/truststore.jks

    By default, no truststore is used.

    In Data Collector Edge pipelines, enter an absolute path to the file that uses the PEM format.

    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password Password to the truststore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Truststore Trust Algorithm Algorithm to manage the truststore.

    Default is SunX509.

    Use Default Protocols Uses the default TLSv1.2 transport layer security (TLS) protocol. To use a different protocol, clear this option.
    Transport Protocols TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Uses a default cipher suite for the SSL/TLS handshake. To use a different cipher suite, clear this option.
    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.

  5. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Data format for messages. Use one of the following data formats:
  6. For binary data, on the Data Format tab, configure the following property:
    Binary Property Description
    Binary Field Path Field that contains the binary data.
  7. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    JSON Content Method to write JSON data:
    • JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
    • Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
    Charset Character set to use when writing data.
  8. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field.
    Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    By default, the destination uses \n.

    On Missing Field When a record does not include the text field, determines whether the destination reports the missing field as an error or ignores the missing field.
    Insert Record Separator if No Text When configured to ignore a missing text field, inserts the configured record separator string to create an empty line.

    When not selected, discards records without the text field.

    Charset Character set to use when writing data.