RabbitMQ Producer

RabbitMQ Producer writes AMQP messages to a single RabbitMQ queue. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.

When you configure the destination, you specify the information needed to connect to RabbitMQ client. You can also use a connection to configure the destination. You define a queue and the bindings to use. You can use multiple bindings to write messages to one or more exchanges. You can also configure SSL/TLS properties, including default transport protocols and cipher suites. You can optionally configure AMQP message properties.

You can add custom configuration options and define connection credentials as needed.

Queue

The RabbitMQ Producer destination writes messages to a specified queue. When you configure the destination, you define the queue name and other queue properties.

The destination can write messages to the following types of queues:
Classic
By default, the destination writes to classic queues. If the defined queue name is a classic queue, you do not need to configure additional properties.
Quorum
If the defined queue name is a quorum queue, you must add x-queue-type as a Declaration Property in the Queue tab and set it to quorum.

If a queue of the specified name does not exist, RabbitMQ creates the queue based on the properties that you defined.

If the queue does exist but the properties are incorrect, error messages display when trying to connect.

Data Formats

RabbitMQ Producer writes data to RabbitMQ based on the data format that you select. You can use the following data formats:
Avro
The stage writes records based on the Avro schema. You can use one of the following methods to specify the location of the Avro schema definition:
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • In Record Header - Use the schema included in the avroSchema record header attribute.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the destination to look up the schema in the Confluent Schema Registry by the schema ID or subject.

    If using the Avro schema in the stage or in the record header attribute, you can optionally configure the stage to register the Avro schema with the Confluent Schema Registry. You can also optionally include the schema definition in the message. Omitting the schema definition can improve performance, but requires the appropriate schema management to avoid losing track of the schema associated with the data.

You can include the Avro schema in the output.
You can also compress data with an Avro-supported compression codec. When using Avro compression, avoid configuring any other compression properties in the stage.
Binary
The stage writes binary data to a single field in the record.
Delimited
The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
You can use the following delimited format types:
  • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
  • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
  • MS Excel CSV - Microsoft Excel comma-separated file.
  • MySQL CSV - MySQL comma-separated file.
  • Tab-Separated Values - File that includes tab-separated values.
  • PostgreSQL CSV - PostgreSQL comma-separated file.
  • PostgreSQL Text - PostgreSQL text file.
  • Custom - File that uses user-defined delimiter, escape, and quote characters.
  • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
JSON
The destination writes records as JSON data. You can use one of the following formats:
  • Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
  • Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
Protobuf
Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
The destination writes records in the SDC Record data format.
Text
The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
You can configure the characters to use as record separators. By default, the destination uses a UNIX-style line ending (\n) to separate records.
When a record does not contain the selected text field, the destination can report the missing field as an error or to ignore the missing field. By default, the destination reports an error.
When configured to ignore a missing text field, the destination can discard the record or write the record separator characters to create an empty line for the record. By default, the destination discards the record.

Configuring a RabbitMQ Producer Destination

Configure a RabbitMQ Producer destination to write messages to RabbitMQ.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error
    Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the RabbitMQ tab, configure the following properties:
    RabbitMQ Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    URI RabbitMQ URI.

    Typically uses the following format:

    amqp://<host>:<port>/<virtualhost>

    Mandatory Requires at least one active consumer subscribed to the queue.

    When selected, if the queue has no active consumers, RabbitMQ returns an error that can stop the pipeline.

    Set AMQP Message Properties Enables RabbitMQ AMQP message properties configuration. When not selected, the destination uses the default RabbitMQ properties.

    For more information about AMQP message properties, see the RabbitMQ documentation.

    Content Type MIME content type.

    Available when Set AMQP Message Properties is selected.

    Content Encoding MIME content encoding.

    Available when Set AMQP Message Properties is selected.

    Headers Message headers.

    Available when Set AMQP Message Properties is selected.

    Delivery Mode Determines if the messages are saved to disk. To save messages to disk, select Persistent.

    Available when Set AMQP Message Properties is selected.

    Priority Message priority. Select a value from 1 to 9.

    Available when Set AMQP Message Properties is selected.

    Correlation ID Correlation ID.

    Available when Set AMQP Message Properties is selected.

    Reply To Queue name for responses.

    Available when Set AMQP Message Properties is selected.

    Set Expiration Enables expiration time for messages. By default, this property is enabled.

    Available when Set AMQP Message Properties is selected.

    Expiration Number of milliseconds before messages expire. Enter an integer from 0 to 32768. Default is 0, meaning that messages expire after being written to a queue unless they can be delivered to a consumer immediately. After being delivered, messages are removed from the queue.

    Available when Set Expiration is selected.

    Message ID Message identifier string.

    Available when Set AMQP Message Properties is selected.

    Set Current Time Uses the current time as the timestamp associated with the message.

    Available when Set AMQP Message Properties is selected.

    Timestamp String to use as the timestamp for the messages.

    Available when Set AMQP Message Properties is selected.

    Message Type Type of event or command that the message represents.

    Available when Set AMQP Message Properties is selected.

    User ID User ID. When used, the user ID must be defined with RabbitMQ.

    Available when Set AMQP Message Properties is selected.

    App ID Application ID.

    Available when Set AMQP Message Properties is selected.

    Use Credentials Enables the use of RabbitMQ credentials.
    Additional Client Configuration Additional RabbitMQ client configuration properties to use. To add properties, click Add and define the RabbitMQ client property name and value.

    Use the property names and values as expected by RabbitMQ.

    One Message per Batch For each batch, writes the records to each partition as a single message.
  3. On the Credentials tab, enter the RabbitMQ credentials to use if you enabled credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
  4. On the Queue tab, configure the following queue properties:
    These properties directly correspond to RabbitMQ properties. For more information, see the RabbitMQ documentation.
    Queue Property Description
    Name Name of the queue to use or create.
    Durable Creates a durable queue when selected.

    The stage creates a durable queue by default.

    Exclusive Creates an exclusive queue when selected. When exclusive, the queue allows only the Data Collector to use it.

    The stage creates an exclusive queue by default.

    Auto-Delete Automatically deletes a queue after all consumers unsubscribe.

    When used with an exclusive queue, the queue is automatically deleted when you stop the pipeline.

    Declaration Properties Additional queue declaration properties to use.
  5. On the Exchange tab, optionally configure the following binding properties for the bindings that you want to use. When no bindings are configured, the default exchange is used.
    These properties directly correspond to RabbitMQ properties. For more information, see the RabbitMQ documentation.
    Exchange Property Description
    Name Binding name.
    Type Binding type.
    Durable Creates a durable exchange.
    Auto-Delete Automatically deletes an exchange when all queues are finished using it.
    Routing Key Routing key.

    Leave empty to default to the queue name.

    Declaration Properties Additional exchange properties to use.
    Binding Properties Additional binding properties to use.
  6. To use SSL/TLS, on the TLS tab, configure the following properties:
    TLS Property Description
    Use TLS Enables the use of TLS.
    Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key.
    Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    Keystore File

    Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    By default, no keystore is used.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password

    Password to the keystore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
    Keystore Key Algorithm

    Algorithm to manage the keystore.

    Default is SunX509.

    Use Remote Truststore Enables loading the contents of the truststore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Trusted Certificates Each PEM certificate used in the remote truststore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    Truststore File

    Path to the local truststore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/truststore.jks

    By default, no truststore is used.

    Truststore Type
    Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password

    Password to the truststore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
    Truststore Trust Algorithm

    Algorithm to manage the truststore.

    Default is SunX509.

    Use Default Protocols Uses the default TLSv1.2 transport layer security (TLS) protocol. To use a different protocol, clear this option.
    Transport Protocols TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Uses a default cipher suite for the SSL/TLS handshake. To use a different cipher suite, clear this option.
    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.

  7. Optionally configure advanced options on the Advanced tab.
    These properties directly correspond to RabbitMQ properties. For more information, see the RabbitMQ documentation.
    Generally, you should use the defaults for these properties:
    Advanced Property Description
    Automatic Recovery Enabled Determines whether to attempt to reestablish a connection.
    Network Recovery Interval Milliseconds to wait before attempting to reestablish a network connection.

    Default is 5000.

    Connection Timeout (ms) Milliseconds for the connection to establish. Use 0 to opt out of a connection timeout.

    Default is 0.

    Handshake Timeout (ms) Milliseconds for the handshake to complete.
    Shutdown Timeout (ms) Milliseconds for the shutdown to complete.
    Heartbeat Timeout (secs) Seconds to wait for a heartbeat to verify that RabbitMQ is up and the connection still available.

    Use 0 to avoid requesting heartbeats. Default is 0.

    Maximum Frame Size (bytes) Maximum frame size in bytes. Use for performance tuning.

    Setting a larger value can improve throughput. Setting a smaller value can improve latency.

    Use 0 for no limit. Default is 0.

    Maximum Channel Number Maximum number of channels allowed.
  8. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format
    Data format for messages:
    • Avro
    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text
  9. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when writing data:
    • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
    • In Record Header - Use the schema in the avroSchema record header attribute. Use only when the avroSchema attribute is defined for all records.
    • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.
    Avro Schema Avro schema definition used to write the data.

    You can optionally use the runtime:loadResource function to load a schema definition stored in a runtime resource file.

    Register Schema Registers a new Avro schema with Confluent Schema Registry.
    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema or to register a new schema. To add a URL, click Add and then enter the URL in the following format:
    http://<host name>:<port number>
    Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication.

    Enter the key and secret from the schema.registry.basic.auth.user.info setting in Schema Registry using the following format:

    <key>:<secret>
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
    Look Up Schema By Method used to look up the schema in Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    Schema Subject Avro schema subject to look up or to register in Confluent Schema Registry.

    If the specified subject to look up has multiple schema versions, the stage uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in Confluent Schema Registry.
    Include Schema Includes the schema in each message.
    Note: Omitting the schema definition can improve performance, but requires the appropriate schema management to avoid losing track of the schema associated with the data.
    Avro Compression Codec The Avro compression type to use.

    When using Avro compression, do not enable other compression available in the destination.

  10. For binary data, on the Data Format tab, configure the following property:
    Binary Property Description
    Binary Field Path Field that contains the binary data.
  11. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Delimiter Format Format for delimited data:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma-separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • PostgreSQL CSV - PostgreSQL comma-separated file.
    • PostgreSQL Text - PostgreSQL text file.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    Header Line Indicates whether to create a header line.
    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Record Separator String Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    Available when using a custom delimiter format.

    Escape Character Escape character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the backslash character ( \ ).

    Quote Character Quote character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the quotation mark character ( " ).

    Replace New Line Characters Replaces new line characters with the configured string.

    Recommended when writing data as a single line of text.

    New Line Character Replacement String to replace each new line character. For example, enter a space to replace each new line character with a space.

    Leave empty to remove the new line characters.

    Charset Character set to use when writing data.
  12. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    JSON Content Method to write JSON data:
    • JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
    • Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
    Charset Character set to use when writing data.
  13. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation. For information about generating the descriptor file, see Protobuf Data Format Prerequisites.

    Message Type Fully-qualified name for the message type to use when writing data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
  14. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field.
    Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    By default, the destination uses \n.

    On Missing Field When a record does not include the text field, determines whether the destination reports the missing field as an error or ignores the missing field.
    Insert Record Separator if No Text When configured to ignore a missing text field, inserts the configured record separator string to create an empty line.

    When not selected, discards records without the text field.

    Charset Character set to use when writing data.