RabbitMQ Consumer

Supported pipeline types:
  • Data Collector

The RabbitMQ Consumer origin reads AMQP messages from a single RabbitMQ queue. For information about supported versions, see Supported Systems and Versions.

When you configure the origin, you specify the information needed to connect to RabbitMQ client. You define a queue and the bindings to use. You can use multiple bindings to read messages from one or more exchanges. You can also configure SSL/TLS properties, including default transport protocols and cipher suites.

You can configure the origin to generate a single record for each RabbitMQ message. By default, if a message includes multiple objects, the origin generates a record for each object.

You can add custom configuration options and define connection credentials as needed.

When a pipeline stops, the RabbitMQ Consumer origin notes where it stops reading. When the pipeline starts again, the origin continues processing from where it stopped by default. You can reset the origin to process all requested data.

Queue

RabbitMQ Consumer reads messages from a specified queue.

When you configure the origin, you define the queue name and other properties. If a queue of the specified name does not exist, RabbitMQ creates the queue based on the properties that you defined.

If the queue does exist but the properties are incorrect, error messages display when trying to connect.

Record Header Attributes

The RabbitMQ Consumer origin includes information from RabbitMQ basic properties in the record header attributes of the generated record. When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.

The origin includes information in the following record header attributes when available:
  • avroSchema - When processing Avro data, provides the Avro schema.
  • appId - Provides information from the RabbitMQ app-id property.
  • contentType - Provides information from the RabbitMQ content-type property.
  • contentEncoding - Provides information from the RabbitMQ content-encoding property.
  • correlationId - Provides information from the RabbitMQ correlation-id property.
  • deliveryMode - Provides information from the RabbitMQ delivery-mode property.
  • deliveryTag - Provides information from the RabbitMQ delivery-tag property.
  • exchange - Provides information from the RabbitMQ exchange-name property.
  • expiration - Provides information from the RabbitMQ expiration property.
  • messageId - Provides information from the RabbitMQ message-id property.
  • messageType - Provides information from the RabbitMQ type property.
  • priority - Provides information from the RabbitMQ priority property.
  • redelivered - Provides information from the RabbitMQ redelivered property.
  • replyTo - Provides information from the RabbitMQ reply-to property.
  • routingKey - Provides information from the RabbitMQ routing-key property.
  • timestamp - Provides information from the RabbitMQ timestamp property.
  • userId - Provides information from the RabbitMQ user-id property.

Data Formats

The RabbitMQ Consumer origin processes data differently based on the data format. The origin can process the following types of data:

Avro
Generates a record for every message. Includes a precision and scale field attribute for each Decimal field.
The stage includes the Avro schema in an avroSchema record header attribute. You can use one of the following methods to specify the location of the Avro schema definition:
  • Message/Data Includes Schema - Use the schema in the message.
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the stage to look up the schema in Confluent Schema Registry by the schema ID embedded in the message or by the schema ID or subject specified in the stage configuration.
Using a schema in the stage configuration or retrieving a schema from Confluent Schema Registry overrides any schema that might be included in the message and can improve performance.
Binary
Generates a record with a single byte array field at the root of the record.
When the data exceeds the user-defined maximum data size, the origin cannot process the data. Because the record is not created, the origin cannot pass the record to the pipeline to be written as an error record. Instead, the origin generates a stage error.
Delimited
Generates a record for each delimited line.
The CSV parser that you choose determines the delimiter properties that you configure and how the stage handles parsing errors. You can specify if the data includes a header line and whether to use it. You can define the number of lines to skip before reading, the character set of the data, and the root field type to use for the generated record.
You can also configure the stage to replace a string constant with null values and to ignore control characters.
For more information about reading delimited data, see Reading Delimited Data.
JSON
Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
When an object exceeds the maximum object length defined for the origin, the origin processes the object based on the error handling configured for the stage.
Log
Generates a record for every log line.
When a line exceeds the user-defined maximum line length, the origin truncates longer lines.
You can include the processed log line as a field in the record. If the log line is truncated, and you request the log line in the record, the origin includes the truncated line.
You can define the log format or type to be read.
Protobuf
Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
Protobuf messages must match the specified message type and be described in the descriptor file.
When the data for a record exceeds 1 MB, the origin cannot continue processing data in the message. The origin handles the message based on the stage error handling property and continues reading the next message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
For error records, the origin provides the original record as read from the origin in the original pipeline, as well as error information that you can use to correct the record.
When processing error records, the origin expects the error file names and contents as generated by the original pipeline.
Text
Generates a record for each line of text or for each section of text based on a custom delimiter.
When a line or section exceeds the maximum line length defined for the origin, the origin truncates it. The origin adds a boolean field named Truncated to indicate if the line was truncated.
For more information about processing text with a custom delimiter, see Text Data Format with Custom Delimiters.
XML
Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Generated records include XML attributes and namespace declarations as fields in the record by default. You can configure the stage to include them in the record as field attributes.
You can include XPath information for each parsed XML element and XML attribute in field attributes. This also places each namespace in an xmlns record header attribute.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
When a record exceeds the user-defined maximum record length, the origin skips the record and continues processing with the next record. It sends the skipped record to the pipeline for error handling.
Use the XML data format to process valid XML documents. For more information about XML processing, see Reading and Processing XML Data.
Tip: If you want to process invalid XML documents, you can try using the text data format with custom delimiters. For more information, see Processing XML Data with Custom Delimiters.

Configuring a RabbitMQ Consumer Origin

Configure a RabbitMQ Consumer origin to read messages from a RabbitMQ queue.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the RabbitMQ tab, configure the following properties:
    RabbitMQ Property Description
    URI RabbitMQ URI.

    Typically uses the following format:

    amqp://<host>:<port>/<virtualhost>

    Consumer Tag RabbitMQ consumer tag. Leave empty to use an automatically generated consumer tag.
    Use Credentials Enables the use of RabbitMQ credentials.
    One Record per Message Generates a single record for each RabbitMQ message.

    When not selected, the origin generates a record for each object in the message.

    Additional Client Configuration Additional RabbitMQ client configuration properties to use. To add properties, click Add and define the RabbitMQ client property name and value.

    Use the property names and values as expected by RabbitMQ.

    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Max Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch.
  3. On the Credentials tab, enter the RabbitMQ credentials to use if you enabled credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
  4. On the Queue tab, configure the following queue properties:
    These properties directly correspond to RabbitMQ properties. For more information, see the RabbitMQ documentation.
    Queue Property Description
    Name Name of the queue to use or create.
    Durable Creates a durable queue when selected.

    The stage creates a durable queue by default.

    Exclusive Creates an exclusive queue when selected. When exclusive, the queue allows only the Data Collector to use it.

    The stage creates an exclusive queue by default.

    Auto-Delete Automatically deletes a queue after all consumers unsubscribe.

    When used with an exclusive queue, the queue is automatically deleted when you stop the pipeline.

    Declaration Properties Additional queue declaration properties to use.
  5. On the Exchange tab, optionally configure the following binding properties for the bindings that you want to use. When no bindings are configured, the default exchange is used.
    These properties directly correspond to RabbitMQ properties. For more information, see the RabbitMQ documentation.
    Exchange Property Description
    Name Binding name.
    Type Binding type.
    Durable Creates a durable exchange.
    Auto-Delete Automatically deletes an exchange when all queues are finished using it.
    Routing Key Routing key.

    Leave empty to default to the queue name.

    Declaration Properties Additional exchange properties to use.
    Binding Properties Additional binding properties to use.
  6. To use SSL/TLS, click the TLS tab and configure the following properties:
    TLS Property Description
    Use TLS Enables the use of TLS.
    Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key.
    Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    Keystore File

    Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    By default, no keystore is used.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password

    Password to the keystore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Keystore Key Algorithm

    Algorithm to manage the keystore.

    Default is SunX509.

    Use Remote Truststore Enables loading the contents of the truststore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Trusted Certificates Each PEM certificate used in the remote truststore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    Truststore File

    Path to the local truststore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/truststore.jks

    By default, no truststore is used.

    Truststore Type
    Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password

    Password to the truststore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Truststore Trust Algorithm

    Algorithm to manage the truststore.

    Default is SunX509.

    Use Default Protocols Uses the default TLSv1.2 transport layer security (TLS) protocol. To use a different protocol, clear this option.
    Transport Protocols TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Uses a default cipher suite for the SSL/TLS handshake. To use a different cipher suite, clear this option.
    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.

  7. Optionally configure advanced options on the Advanced tab.
    These properties directly correspond to RabbitMQ properties. For more information, see the RabbitMQ documentation.
    Generally, you should use the defaults for these properties:
    Advanced Property Description
    Automatic Recovery Enabled Determines whether the origin attempts to reestablish a connection.
    Network Recovery Interval Milliseconds to wait before attempting to reestablish a network connection.

    Default is 5000.

    Connection Timeout (ms) Milliseconds for the connection to establish. Use 0 to opt out of a connection timeout.

    Default is 0.

    Handshake Timeout (ms) Milliseconds for the handshake to complete.
    Shutdown Timeout (ms) Milliseconds for the shutdown to complete.
    Heartbeat Timeout (secs) Seconds to wait for a heartbeat from RabbitMQ to verify that the origin is up and the connection still available.

    Use 0 to avoid requesting heartbeats. Default is 0.

    Maximum Frame Size (bytes) Maximum frame size in bytes. Use for performance tuning.

    Setting a larger value can improve throughput. Setting a smaller value can improve latency.

    Use 0 for no limit. Default is 0.

    Maximum Channel Number Maximum number of channels allowed.
  8. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Type of data to be read. Use one of the following data formats:
    • Avro
    • Binary
    • Delimited
    • JSON
    • Log
    • Protobuf
    • SDC Record
    • Text
    • XML
  9. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when processing data:
    • Message/Data Includes Schema - Use the schema in the message.
    • In Pipeline Configuration - Use the schema provided in the stage configuration.
    • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.

    Using a schema in the stage configuration or in Confluent Schema Registry can improve performance.

    Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data.

    You can optionally use the runtime:loadResource function to load a schema definition stored in a runtime resource file.

    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add and then enter the URL in the following format:
    http://<host name>:<port number>
    Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication.

    Enter the key and secret from the schema.registry.basic.auth.user.info setting in Schema Registry using the following format:

    <key>:<secret>
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Lookup Schema By Method used to look up the schema in Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    • Embedded Schema ID - Look up the Avro schema ID embedded in each message.
    Overrides any existing schema definitions associated with the message.
    Schema Subject Avro schema subject to look up in Confluent Schema Registry.

    If the specified subject has multiple schema versions, the stage uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in Confluent Schema Registry.
  10. For binary data, on the Data Format tab and configure the following property:
    Binary Property Description
    Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error.
  11. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Delimiter Format Type Delimiter format type. Use one of the following options:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma-separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • PostgreSQL CSV - PostgreSQL comma-separated file.
    • PostgreSQL Text - PostgreSQL text file.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.

    Available when using the Apache Commons parser type.

    Header Line Indicates whether a file contains a header line, and whether to use the header line.
    Delimiter Character Delimiter character. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Available when using the Apache Commons parser with a custom delimiter format.

    Multi Character Field Delimiter Characters that delimit fields.

    Default is two pipe characters (||).

    Available when using the Apache Commons parser with the multi-character delimiter format.

    Multi Character Line Delimiter Characters that delimit lines or records.

    Default is the newline character (\n).

    Available when using the Apache Commons parser with the multi-character delimiter format.

    Escape Character Escape character.

    Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.

    Quote Character Quote character.

    Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.

    Enable Comments Allows commented data to be ignored for custom delimiter format.

    Available when using the Apache Commons parser.

    Comment Marker Character that marks a comment when comments are enabled for custom delimiter format.

    Available when using the Apache Commons parser.

    Lines to Skip Number of lines to skip before reading data.
    Allow Extra Columns Allows processing records with more columns than exist in the header line.

    Available when using the Apache Commons parser to process data with a header line.

    Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>.

    For example, _extra_1. Default is _extra_.

    Available when using the Apache Commons parser to process data with a header line while allowing extra columns.

    Max Record Length (chars) Maximum length of a record in characters. Longer records are not read.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Available when using the Apache Commons parser.

    Ignore Empty Lines Allows empty lines to be ignored.

    Available when using the Apache Commons parser with the custom delimiter format.

    Root Field Type Root field type to use:
    • List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
    • List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
    Parse NULLs Replaces the specified string constant with null values.
    NULL Constant String constant to replace with null values.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  12. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    JSON Content Type of JSON content. Use one of the following options:
    • JSON array of objects
    • Multiple JSON objects
    Max Object Length (chars) Maximum number of characters in a JSON object.

    Longer objects are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  13. For log data, on the Data Format tab, configure the following properties:
    Log Property Description
    Log Format Format of the log files. Use one of the following options:
    • Common Log Format
    • Combined Log Format
    • Apache Error Log Format
    • Apache Access Log Custom Format
    • Regular Expression
    • Grok Pattern
    • Log4j
    • Common Event Format (CEF)
    • Log Event Extended Format (LEEF)
    Max Line Length Maximum length of a log line. The origin truncates longer lines.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Retain Original Line Determines how to treat the original log line. Select to include the original log line as a field in the resulting record.

    By default, the original line is discarded.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
    • When you select Apache Access Log Custom Format, use Apache log format strings to define the Custom Log Format.
    • When you select Regular Expression, enter the regular expression that describes the log format, and then map the fields that you want to include to each regular expression group.
    • When you select Grok Pattern, you can use the Grok Pattern Definition field to define custom grok patterns. You can define a pattern on each line.

      In the Grok Pattern field, enter the pattern to use to parse the log. You can use a predefined grok patterns or create a custom grok pattern using patterns defined in Grok Pattern Definition.

      For more information about defining grok patterns and supported grok patterns, see Defining Grok Patterns.

    • When you select Log4j, define the following properties:
      Log4j Property Description
      On Parse Error Determines how to handle information that cannot be parsed:
      • Skip and Log Error - Skips reading the line and logs a stage error.
      • Skip, No Error - Skips reading the line and does not log an error.
      • Include as Stack Trace - Includes information that cannot be parsed as a stack trace to the previously-read log line. The information is added to the message field for the last valid log line.
      Use Custom Log Format Allows you to define a custom log format.
      Custom Log4J Format Use log4j variables to define a custom log format.
  14. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For information about generating the descriptor file, see Protobuf Data Format Prerequisites. For more information about environment variables, see Data Collector Environment Configuration.

    Message Type The fully-qualified name for the message type to use when reading data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
    Delimited Messages Indicates if a message might include more than one protobuf message.
  15. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated.

    Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Use Custom Delimiter Uses custom delimiters to define records instead of line breaks.
    Custom Delimiter One or more characters to use to define records.
    Include Custom Delimiter Includes delimiter characters in the record.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  16. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Delimiter Element
    Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:
    • An XML element directly under the root element.

      Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.

    • A simplified XPath expression that specifies the data to use.

      Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.

      For more information about valid syntax, see Simplified XPath Syntax.

    Preserve Root Element Includes the root element in the generated records.

    When omitting a delimiter to generate a single record, the root element is the root element of the XML document.

    When specifying a delimiter to generate multiple records, the root element is the XML element specified as the delimiter element or is the last XML element in the simplified XPath expression specified as the delimiter element.

    Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute.

    When not selected, this information is not included in the record. By default, the property is not selected.

    Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
    Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces.

    For information about using namespaces with an XML element, see Using XML Elements with Namespaces.

    For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.

    Using simple or bulk edit mode, click the Add icon to add additional namespaces.

    Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields.
    Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.

    By default, the property is not selected.

    Max Record Length (chars)

    The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.