Pulsar Consumer (Legacy)

The Pulsar Consumer (Legacy) origin reads messages from one or more topics in an Apache Pulsar cluster. For information about supported versions, see Supported Systems and Versions.

Data Collector also includes the Pulsar Consumer origin, which can use multiple threads to process data in parallel. StreamSets recommends using the Pulsar Consumer origin to read messages from an Apache Pulsar cluster. Because the Pulsar Consumer origin can process different batches simultaneously, the origin can process messages faster.

The Pulsar Consumer (Legacy) origin subscribes to Pulsar topics, processes incoming messages, and then sends acknowledgements back to Pulsar as the messages are read.

When you configure a Pulsar Consumer (Legacy) origin, you define the URL to connect to Pulsar, and you configure the Pulsar security features to use when connecting. You can also use a connection to configure the origin.

You define the subscription name and consumer name to use for the origin and the topics to subscribe to. When the pipeline starts, Pulsar creates a consumer with the specified consumer name. If the subscription and topics do not exist, Pulsar also creates the subscription and topics.

You can configure the schema used to determine compatibility with a Pulsar topic. You can also configure advanced properties as needed, such as the type of subscription to create or the initial offset to begin reading from.

The Pulsar Consumer (Legacy) origin includes record header attributes that enable you to use information about the record in pipeline processing.

For more information about Pulsar topics, subscriptions, and consumers, see the Apache Pulsar documentation.

Topics Selector

The Pulsar Consumer (Legacy) origin can subscribe to a single topic or to multiple topics. In either case, the origin uses one thread for the read.

Note: To perform parallel reads of the same topic, see Parallel Reads of a Topic.

To define the topics that the origin subscribes to, configure the Topics Selector property on the Pulsar tab.

The origin provides the following methods of subscribing to topics:

Single Topic
Subscribes to a single topic. Use the following format to specify the topic name:
{persistent|non-persistent}://<tenant>/<namespace>/<topic name>
For example, to subscribe to a persistent topic named my-sdc-topic in the my-namespace namespace within the my-tenant tenant, enter the following as the topic name:
persistent://my-tenant/my-namespace/my-sdc-topic
If you enter a topic name only, then Pulsar uses the default persistent://public/default/ location. For example, to subscribe to a persistent topic belonging to the public tenant in the default namespace, simply enter the topic name as follows:
my-sdc-topic
If the specified topic does not exist, Pulsar creates the topic when the pipeline starts.
You can use expressions to define the topic name. For example, to subscribe to a topic named with the Data Collector host name, enter the following as the topic name:
persistent://my-tenant/my-namespace/${sdc:hostname()}
Topics List
Subscribes to multiple topics defined in a list of topic names. Use the Add icon to add additional topic names. Define each topic name using the same format required for a single topic.
Topics Pattern
Subscribes to multiple topics defined by a naming pattern. Use the following format to specify the pattern:
{persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
For example to subscribe to all persistent topics whose names start with sdc-, enter the following as the topic name:
persistent://my-tenant/my-namespace/sdc-.*
This pattern subscribes the origin to all topics within the specified tenant and namespace with names such as sdc-topic or sdc-data.
Important: When subscribing to multiple topics by a pattern, all topics must be in the same namespace.

For more information about defining topic names and about subscribing to multiple topics, see the Apache Pulsar documentation.

Parallel Reads of a Topic

The Pulsar Consumer (Legacy) origin can subscribe to a single topic or to multiple topics. In either case, the origin uses one thread to read from the topic or topics. To perform parallel reads from the same topic, you can configure multiple pipelines with a Pulsar Consumer origin that subscribe to the same topic.

To configure multiple origins to subscribe to the same topic, you'll need to determine whether the origins use a single shared subscription, multiple exclusive subscriptions, or multiple failover subscriptions. A shared subscription allows multiple consumers to attach to the same subscription. An exclusive subscription allows only a single consumer to attach to the subscription. A failover subscription allows multiple consumers to attach to the same subscription, but only one consumer receives messages at a time. If the initial master consumer disconnects, messages are delivered to the next consumer.

For more information about the Pulsar subscription modes, see Subscription Modes in the Pulsar documentation.

You can configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same Pulsar topic in the following ways:

Single shared subscription
To configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same topic using the same shared subscription, configure the following origin properties:
  • Subscription Name - On the Pulsar tab, configure each origin to use the same subscription name.
  • Topic - On the Pulsar tab, configure each origin to use the same topic name.
  • Subscription Type - On the Advanced tab, configure each origin to use the shared subscription type.
Multiple exclusive subscriptions
To configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same topic using multiple exclusive subscriptions, configure the following origin properties:
  • Subscription Name - On the Pulsar tab, configure each origin to use a unique subscription name.
  • Topic - On the Pulsar tab, configure each origin to use the same topic name.
  • Subscription Type - On the Advanced tab, configure each origin to use the exclusive subscription type.
Multiple failover subscriptions
To configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same topic using multiple failover subscriptions, configure the following origin properties:
  • Subscription Name - On the Pulsar tab, configure each pair of origins to use a unique subscription name.

    For example, configure the origins in pipelineA and pipelineB to use subscription1. The origin in pipelineA serves as the master consumer, and the origin in pipelineB is the next in line to receive messages if the pipelineA origin disconnects. Then, configure the origins in pipelineC and pipelineD to use subscription2.

  • Topic - On the Pulsar tab, configure each origin to use the same topic name.
  • Subscription Type - On the Advanced tab, configure each origin to use the failover subscription type.

Offset Management

The first time that a Pulsar Consumer (Legacy) origin receives messages from a topic, an offset entry is created for that subscription and topic. The offset entry is created and maintained by Pulsar.

The Pulsar Consumer (Legacy) origin begins receiving messages in the topic based on whether or not a stored offset entry exists:
No stored offset
When the subscription and topic combination does not have a previously stored offset, the Pulsar Consumer (Legacy) origin begins receiving messages based on the value of the initial offset defined on the Advanced tab of the origin.
You use the following values for the initial offset:
  • Latest - Start reading the latest available message written to the topic after the pipeline starts, ignoring any existing messages in the topic. This is the default initial offset.
  • Earliest - Start reading the earliest available message in the topic that has not been acknowledged.
Previously stored offset
When the subscription and topic combination has a previously stored offset, the Pulsar Consumer (Legacy) origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.

Record Header Attributes

The Pulsar Consumer (Legacy) origin includes the following information in records as record header attributes:
User-defined information in the properties field of the message
The origin includes any user-defined information in the properties field of the message - outside of the payload field - as record header attributes. When generating the record header attribute, the origin uses the same names as the user-defined key/value pairs in the properties field. For example, if the Pulsar message includes an environment key/value pair in the properties field, the origin generates a record header attribute named environment.
pulsar.topic
The origin includes the topic that the message was read from in the pulsar.topic record header attribute.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.

Schema Properties

The origin contains properties to specify a schema on the Schema tab and the Data Format tab:
Schema tab
The schema specified on the Schema tab determines the compatibility between the origin and a Pulsar topic.

If the origin reads from a topic in a Pulsar namespace configured to enforce schema validation, then you must specify a schema on the Schema tab. The origin passes the schema to Pulsar. Then Pulsar uses the schema to verify that the origin is compatible with a topic.

Data Format tab
The schema specified on the Data Format tab supports message processing.

If you configure the origin to read messages in Avro format, then you can specify a schema on the Data Format tab. The origin uses the specified schema to process messages.

If you specify a schema on both tabs, then specify the same schema.

Security

Configure the Pulsar Consumer (Legacy) origin to use the security features available in the Pulsar cluster.

The origin supports the following security features:

TLS transport encryption
The Pulsar cluster encrypts all traffic between the Pulsar server and the stage. The Pulsar server provides a key and certificate, which the stage uses to verify the server's identity. For details, see the Pulsar documentation on TLS transport encryption.
TLS authentication
The stage provides keys and certificates, which the Pulsar server uses to verify the stage's identity. TLS authentication requires TLS transport encryption. For details, see the Pulsar documentation on TLS authentication.
JWT authentication
The stage provides Pulsar a JSON Web Token (JWT), which identifies the stage and grants permission for some actions. JWT authentication requires TLS transport encryption. For details, see the Pulsar documentation on JWT authentication.
OAuth authentication
The stage provides Pulsar an OAuth 2.0 access token, which identifies the stage and associates the stage with a role. For details, see the Pulsar documentation on OAuth authentication.

Enabling TLS Transport Encryption

Enable TLS transport encryption to encrypt all traffic between the Pulsar server and the stage.

  1. On the Pulsar tab of the stage, set the Pulsar URL property to the secure URL for the broker service.
    Use the following format for the URL:
    pulsar+ssl://<host name>:<broker service TLS port>/
    For example:
    pulsar+ssl://pulsar.us-west.example.com:6651/
  2. On the Security tab of the stage, select Enable TLS.
  3. Store the PEM file that contains the certificate authority (CA) that signed the Pulsar cluster certificate in the Data Collector resources directory, $SDC_RESOURCES.
    For information about creating certificates for the Pulsar cluster, see the Pulsar documentation.
  4. On the Security tab of the stage, enter the name of the CA certificate PEM file in the CA Certificate PEM property.

Enabling TLS Authentication

Enable TLS authentication so that Pulsar can authenticate the stage with certificates.

  1. Enable TLS transport encryption.
  2. On the Security tab of the stage, select Enable Mutual Authentication.
  3. Create the client certificate and client private key PEM files for the stage to use.
    For information about creating client certificates for Pulsar, see the Pulsar documentation.
  4. Store the client certificate and client private key PEM files created for the stage in the Data Collector resources directory, $SDC_RESOURCES.
  5. On the Security tab of the stage, enter the name of the client files in the Client Certificate PEM and Client Key PEM properties.

Enabling JWT Authentication

Enable JWT authentication so that Pulsar can authenticate the stage with a JSON Web Token (JWT).

Before you start, contact the Pulsar administrator for a token string that represents a signed JWT for the stage.
  1. Enable TLS transport encryption.
  2. On the Security tab, select Use JWT.
  3. In the Token property, enter the token string that represents a signed JWT for the stage.

Enabling OAuth Authentication

Enable OAuth authentication so that Pulsar can authenticate the stage with an access token.

  1. Create a JSON credentials file and store in the Data Collector resources directory, $SDC_RESOURCES, or in a relative directory.
    The credentials file contains service account credentials for authenticating the stage. For example, you might create a credentials file named credentials_file.json:
    {
      "type": "client_credentials",
      "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
      "client_secret": "on1uJ...k6F6R",
      "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
      "issuer_url": "https://accounts.google.com"
    }
  2. On the Security tab, select Use OAuth.
  3. In the Issuer URL property, enter the URL of the authentication provider that provides Pulsar clients with access tokens.
  4. In the Credentials URL property, enter the path of the JSON credentials file, relative to the Data Collector resources directory, $SDC_RESOURCES.
    For example, if you created the credentials_file.json file in the $SDC_RESOURCES directory, enter /credentials_file.json.
  5. In the Audience property, enter the URL that the stage uses to reach the Pulsar cluster.

Data Formats

The Pulsar Consumer (Legacy) origin processes data differently based on the data format. Pulsar Consumer (Legacy) can process the following types of data:

Avro
Generates a record for every message. Includes a precision and scale field attribute for each Decimal field.
The stage includes the Avro schema in an avroSchema record header attribute. You can use one of the following methods to specify the location of the Avro schema definition:
  • Message/Data Includes Schema - Use the schema in the message.
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the stage to look up the schema in Confluent Schema Registry by the schema ID embedded in the message or by the schema ID or subject specified in the stage configuration.
Using a schema in the stage configuration or retrieving a schema from Confluent Schema Registry overrides any schema that might be included in the message and can improve performance.
Binary
Generates a record with a single byte array field at the root of the record.
When the data exceeds the user-defined maximum data size, the origin cannot process the data. Because the record is not created, the origin cannot pass the record to the pipeline to be written as an error record. Instead, the origin generates a stage error.
Datagram
Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
  • RFC 5424
  • RFC 3164
  • Non-standard common messages, such as RFC 3339 dates with no version digit
When processing NetFlow messages, the stage generates different records based on the NetFlow version. When processing NetFlow 9, the records are generated based on the NetFlow 9 configuration properties. For more information, see NetFlow Data Processing.
Delimited
Generates a record for each delimited line.
The CSV parser that you choose determines the delimiter properties that you configure and how the stage handles parsing errors. You can specify if the data includes a header line and whether to use it. You can define the number of lines to skip before reading, the character set of the data, and the root field type to use for the generated record.
You can also configure the stage to replace a string constant with null values and to ignore control characters.
For more information about reading delimited data, see Reading Delimited Data.
JSON
Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
When an object exceeds the maximum object length defined for the origin, the origin processes the object based on the error handling configured for the stage.
Log
Generates a record for every log line.
When a line exceeds the user-defined maximum line length, the origin truncates longer lines.
You can include the processed log line as a field in the record. If the log line is truncated, and you request the log line in the record, the origin includes the truncated line.
You can define the log format or type to be read.
Protobuf
Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
Protobuf messages must match the specified message type and be described in the descriptor file.
When the data for a record exceeds 1 MB, the origin cannot continue processing data in the message. The origin handles the message based on the stage error handling property and continues reading the next message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
For error records, the origin provides the original record as read from the origin in the original pipeline, as well as error information that you can use to correct the record.
When processing error records, the origin expects the error file names and contents as generated by the original pipeline.
Text
Generates a record for each line of text or for each section of text based on a custom delimiter.
When a line or section exceeds the maximum line length defined for the origin, the origin truncates it. The origin adds a boolean field named Truncated to indicate if the line was truncated.
For more information about processing text with a custom delimiter, see Text Data Format with Custom Delimiters.
XML
Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Generated records include XML attributes and namespace declarations as fields in the record by default. You can configure the stage to include them in the record as field attributes.
You can include XPath information for each parsed XML element and XML attribute in field attributes. This also places each namespace in an xmlns record header attribute.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
When a record exceeds the user-defined maximum record length, the origin skips the record and continues processing with the next record. It sends the skipped record to the pipeline for error handling.
Use the XML data format to process valid XML documents. For more information about XML processing, see Reading and Processing XML Data.
Tip: If you want to process invalid XML documents, you can try using the text data format with custom delimiters. For more information, see Processing XML Data with Custom Delimiters.

Configuring a Pulsar Consumer (Legacy) Origin

Configure a Pulsar Consumer (Legacy) origin to read messages from Apache Pulsar.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Pulsar tab, configure the following properties:
    Pulsar Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Pulsar URL URL to the Pulsar web service or broker service.
    If the Pulsar cluster is not enabled for TLS, enter either the web service or broker service URL in the following format:
    • Web service URL - http://<host name>:<web service port>. For example: http://pulsar.us-west.example.com:8080.
    • Broker service URL - pulsar://<host name>:<broker service port>. For example: pulsar://pulsar.us-west.example.com:6650
    If the Pulsar cluster is enabled for TLS, enter the secure broker service URL in the following format:
    pulsar+ssl://<host name>:<broker service TLS port>

    For example: pulsar+ssl://pulsar.us-west.example.com:6651

    Subscription Name Name of the subscription to create for the origin.

    Default is sdc-subscription.

    Keep Alive Interval (ms) Number of milliseconds to allow the connection to Pulsar to remain idle. After the origin receives no messages for this amount of time, the connection is closed. The origin must reconnect to Pulsar.

    Default is 30,000 milliseconds.

    Operation Timeout (ms) Number of milliseconds to allow the Pulsar consumer-create, consumer-subscribe, and consumer-unsubscribe operations to complete before marking the operations as failed.

    Default is 30,000 milliseconds.

    Consumer Name Name of the consumer to create for the origin.

    Enter the consumer name or define an expression that evaluates to the consumer name.

    Topics Selector Method that topics are subscribed to:
    • Single Topic - Subscribe to a single topic by name.
    • Topics List - Subscribe to multiple topics defined in a list of topic names.
    • Topics Pattern - Subscribe to multiple topics defined by a naming pattern.
    Topic Name of the single topic to subscribe to. Enter the topic name in the following format:
    {persistent|non-persistent}://<tenant>/<namespace>/<topic name>
    Topics List List of topic names to subscribe to. Enter each topic name in the following format:
    {persistent|non-persistent}://<tenant>/<namespace>/<topic name>

    Use the Add icon to add additional topic names. You can use simple or bulk edit mode to add additional topics.

    Topics Pattern Pattern of topic names to subscribe to. Enter the pattern in the following format:
    {persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Max Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch.
    Produce Single Record Generates a single record for records that include multiple objects.

    When not selected, the origin generates multiple records when a record includes multiple objects.

  3. On the Schema tab, configure the following properties:
    Schema Property Description
    Schema Schema used to determine compatibility between the origin and a Pulsar topic. Select one of the following options:
    • No Schema - No schema used. Select this option when the Pulsar namespace of the topic is not configured to enforce schema validation.
    • Auto Schema - Use a generic schema created by Pulsar.
    • User-defined Schema - Use the schema specified in the Schema Info property. For the origin to connect to the Pulsar broker, the specified schema must match the topic schema.
      Note: If the origin reads from multiple topics, the specified schema must match the schema for all of the topics.
    Schema Info Schema definition to pass to Pulsar. Specify the schema using the SchemaInfo data structure, as described in the Pulsar documentation.

    Available if Schema is set to User-defined Schema.

    Important: The origin only supports Avro, JSON, and string schemas. You must specify a corresponding data format on the Data Format tab, either Avro, JSON, or Text. For Avro, specify the same schema on the Data Format tab.
  4. To enable security, click the Security tab and configure the following properties:
    Security Property Description
    Enable TLS Enables the stage to connect securely to Pulsar through TLS encryption.
    Use JWT Enables the stage to use a JSON Web Token (JWT) to authenticate with Pulsar.

    Available if TLS encryption is enabled.

    Token If JWT is enabled, JWT to pass to Pulsar.
    Enable Mutual Authentication Enables the stage to use a key and certificate to authenticate with Pulsar.

    Available if TLS encryption is enabled.

    CA Certificate PEM Path to the PEM file containing the certificate authority (CA) that signed the Pulsar cluster certificate.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Client Certificate PEM If mutual authentication is enabled, path to the PEM file containing the client certificate created for Data Collector.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Client Key PEM If mutual authentication is enabled, path to the PEM file containing the client private key created for Data Collector.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Use OAuth Enables the stage to use an OAuth 2 access token to authenticate with Pulsar.
    Issuer URL If OAuth is enabled, URL of the authentication provider that provides Pulsar clients with access tokens.
    Credentials URL If OAuth is enabled, path to the JSON credentials file.

    Enter a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Audience If OAuth is enabled, URI of the resource server that the stage wants to access.
  5. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Subscription Type Type of subscription to create:
    • Exclusive
    • Failover
    • Shared

    Default is Exclusive.

    For information about each subscription type, see the Pulsar documentation.

    Consumer Queue Size Number of messages that Pulsar can add to the consumer queue.

    Default is 1,000 messages.

    Initial Offset Offset value to use when the pipeline starts for the first time:
    • Earliest
    • Latest

    Default is Latest.

    Pattern Auto Discovery Period (minutes) When subscribing to topics defined by a naming pattern, the number of minutes to allow the origin to locate all matching topics.

    Default is one minute.

    Consumer Priority Level Priority level assigned to the Pulsar Consumer (Legacy) origin. Consumers with a higher priority receive more messages.

    Default is 0.

    Read Compacted Read messages from compacted persistent topics instead of reading from the full message backlog of the topics.

    When enabled, the origin reads only the latest value for each key in the topic.

    For persistent topics only.

    Pulsar Configuration Properties

    Additional Pulsar configuration properties to use. Using simple or bulk edit mode, click the Add icon to add properties. Define the Pulsar property name and value.

    Use the property names and values as expected by Pulsar.

  6. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Type of data to be read. Use one of the following options:
    • Avro
    • Binary
    • Datagram
    • Delimited
    • JSON
    • Log
    • Protobuf
    • SDC Record
    • Text
    • XML
  7. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when processing data:
    • Message/Data Includes Schema - Use the schema in the message.
    • In Pipeline Configuration - Use the schema provided in the stage configuration.
    • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.

    Using a schema in the stage configuration or in Confluent Schema Registry can improve performance.

    Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data.

    You can optionally use the runtime:loadResource function to load a schema definition stored in a runtime resource file.

    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add and then enter the URL in the following format:
    http://<host name>:<port number>
    Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication.

    Enter the key and secret from the schema.registry.basic.auth.user.info setting in Schema Registry using the following format:

    <key>:<secret>
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Lookup Schema By Method used to look up the schema in Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    • Embedded Schema ID - Look up the Avro schema ID embedded in each message.
    Overrides any existing schema definitions associated with the message.
    Schema Subject Avro schema subject to look up in Confluent Schema Registry.

    If the specified subject has multiple schema versions, the stage uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in Confluent Schema Registry.
  8. For binary data, on the Data Format tab, configure the following properties:
    Binary Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error.
  9. For datagram data, on the Data Format tab, configure the following properties:
    Datagram Properties Description
    Datagram Packet Format Packet format of the data:
    • collectd
    • NetFlow
    • syslog
    • Raw/separated data
    TypesDB File Path Path to a user-provided types.db file. Overrides the default types.db file.

    For collectd data only.

    Auth File Path to an optional authentication file. Use an authentication file to accept signed and encrypted data.

    For collectd data only.

    Convert Hi-Res Time & Interval Converts the collectd high resolution time format interval and timestamp to UNIX time, in milliseconds.

    For collectd data only.

    Exclude Interval Excludes the interval field from output record.

    For collectd data only.

    Record Generation Mode Determines the type of values to include in the record. Select one of the following options:
    • Raw Only
    • Interpreted Only
    • Both Raw and Interpreted

    For NetFlow 9 data only.

    Max Templates in Cache The maximum number of templates to store in the template cache. For more information about templates, see Caching NetFlow 9 Templates.

    Default is -1 for an unlimited cache size.

    For NetFlow 9 data only.

    Template Cache Timeout (ms) The maximum number of milliseconds to cache an idle template. Templates unused for more than the specified time are evicted from the cache. For more information about templates, see Caching NetFlow 9 Templates.

    Default is -1 for caching templates indefinitely.

    For NetFlow 9 data only.

    Charset Character encoding of the messages to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  10. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Header Line Indicates whether a file contains a header line, and whether to use the header line.
    Delimiter Format Type Delimiter format type. Use one of the following options:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma-separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • PostgreSQL CSV - PostgreSQL comma-separated file.
    • PostgreSQL Text - PostgreSQL text file.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.

    Available when using the Apache Commons parser type.

    Multi Character Field Delimiter Characters that delimit fields.

    Default is two pipe characters (||).

    Available when using the Apache Commons parser with the multi-character delimiter format.

    Multi Character Line Delimiter Characters that delimit lines or records.

    Default is the newline character (\n).

    Available when using the Apache Commons parser with the multi-character delimiter format.

    Delimiter Character Delimiter character. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Available when using the Apache Commons parser with a custom delimiter format.

    Field Separator One or more characters to use as delimiter characters between columns.

    Available when using the Univocity parser.

    Escape Character Escape character.

    Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.

    Quote Character Quote character.

    Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.

    Line Separator Line separator.

    Available when using the Univocity parser.

    Allow Comments Allows commented data to be ignored for custom delimiter format.

    Available when using the Univocity parser.

    Comment Character

    Character that marks a comment when comments are enabled for custom delimiter format.

    Available when using the Univocity parser.

    Enable Comments Allows commented data to be ignored for custom delimiter format.

    Available when using the Apache Commons parser.

    Comment Marker Character that marks a comment when comments are enabled for custom delimiter format.

    Available when using the Apache Commons parser.

    Lines to Skip Number of lines to skip before reading data.
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    CSV Parser Parser to use to process delimited data:
    • Apache Commons - Provides robust parsing and a wide range of delimited format types.
    • Univocity - Can provide faster processing for wide delimited files, such as those with over 200 columns.

    Default is Apache Commons.

    Max Columns Maximum number of columns to process per record.

    Available when using the Univocity parser.

    Max Character per Column Maximum number of characters to process in each column.

    Available when using the Univocity parser.

    Skip Empty Lines Allows skipping empty lines.

    Available when using the Univocity parser.

    Allow Extra Columns Allows processing records with more columns than exist in the header line.

    Available when using the Apache Commons parser to process data with a header line.

    Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>.

    For example, _extra_1. Default is _extra_.

    Available when using the Apache Commons parser to process data with a header line while allowing extra columns.

    Max Record Length (chars) Maximum length of a record in characters. Longer records are not read.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Available when using the Apache Commons parser.

    Ignore Empty Lines Allows empty lines to be ignored.

    Available when using the Apache Commons parser with the custom delimiter format.

    Root Field Type Root field type to use:
    • List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
    • List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
    Parse NULLs Replaces the specified string constant with null values.
    NULL Constant String constant to replace with null values.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  11. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    JSON Content Type of JSON content. Use one of the following options:
    • JSON array of objects
    • Multiple JSON objects
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Object Length (chars) Maximum number of characters in a JSON object.

    Longer objects are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  12. For log data, on the Data Format tab, configure the following properties:
    Log Property Description
    Log Format Format of the log files. Use one of the following options:
    • Common Log Format
    • Combined Log Format
    • Apache Error Log Format
    • Apache Access Log Custom Format
    • Regular Expression
    • Grok Pattern
    • Log4j
    • Common Event Format (CEF)
    • Log Event Extended Format (LEEF)
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Line Length Maximum length of a log line. The origin truncates longer lines.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Retain Original Line Determines how to treat the original log line. Select to include the original log line as a field in the resulting record.

    By default, the original line is discarded.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
    • When you select Apache Access Log Custom Format, use Apache log format strings to define the Custom Log Format.
    • When you select Regular Expression, enter the regular expression that describes the log format, and then map the fields that you want to include to each regular expression group.
    • When you select Grok Pattern, you can use the Grok Pattern Definition field to define custom grok patterns. You can define a pattern on each line.

      In the Grok Pattern field, enter the pattern to use to parse the log. You can use a predefined grok patterns or create a custom grok pattern using patterns defined in Grok Pattern Definition.

      For more information about defining grok patterns and supported grok patterns, see Defining Grok Patterns.

    • When you select Log4j, define the following properties:
      Log4j Property Description
      On Parse Error Determines how to handle information that cannot be parsed:
      • Skip and Log Error - Skips reading the line and logs a stage error.
      • Skip, No Error - Skips reading the line and does not log an error.
      • Include as Stack Trace - Includes information that cannot be parsed as a stack trace to the previously-read log line. The information is added to the message field for the last valid log line.
      Use Custom Log Format Allows you to define a custom log format.
      Custom Log4J Format Use log4j variables to define a custom log format.
  13. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For information about generating the descriptor file, see Protobuf Data Format Prerequisites. For more information about environment variables, see Java and Security Configuration.

    Message Type The fully-qualified name for the message type to use when reading data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
    Delimited Messages Indicates if a message might include more than one protobuf message.
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

  14. For SDC Record data, on the Data Format tab, configure the following properties:
    SDC Record Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

  15. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated.

    Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Use Custom Delimiter Uses custom delimiters to define records instead of line breaks.
    Custom Delimiter One or more characters to use to define records.
    Include Custom Delimiter Includes delimiter characters in the record.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  16. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Delimiter Element
    Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:
    • An XML element directly under the root element.

      Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.

    • A simplified XPath expression that specifies the data to use.

      Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.

      For more information about valid syntax, see Simplified XPath Syntax.

    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Preserve Root Element Includes the root element in the generated records.

    When omitting a delimiter to generate a single record, the root element is the root element of the XML document.

    When specifying a delimiter to generate multiple records, the root element is the XML element specified as the delimiter element or is the last XML element in the simplified XPath expression specified as the delimiter element.

    Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute.

    When not selected, this information is not included in the record. By default, the property is not selected.

    Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
    Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces.

    For information about using namespaces with an XML element, see Using XML Elements with Namespaces.

    For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.

    Using simple or bulk edit mode, click the Add icon to add additional namespaces.

    Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields.
    Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.

    By default, the property is not selected.

    Max Record Length (chars)

    The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.