Pulsar Producer

The Pulsar Producer destination writes data to topics in an Apache Pulsar cluster. The Pulsar Producer destination attaches to one or more topics and publishes messages to a Pulsar broker for processing. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.

When you configure a Pulsar Producer destination, you define the URL to connect to Pulsar, and you configure the Pulsar security features to use when connecting. You can also use a connection to configure the destination.

You define the topics to publish messages to, either a single topic or multiple topics by including an expression in the configured topic name. In addition, you define the schema Pulsar uses to validate the messages that the destination writes to the topic.

You can configure advanced properties as needed, such as the partition or compression type to use when publishing messages or whether the destination publishes messages asynchronously or synchronously.

For more information about Pulsar topics and producers, see the Apache Pulsar documentation.

Schema Properties

The destination contains properties to specify a schema on the Schema tab and the Data Format tab:
Schema tab
The schema specified on the Schema tab is used to validate messages written to Pulsar.

If the destination writes to a topic in a Pulsar namespace configured to enforce schema validation, then you must specify a schema on the Schema tab. The destination passes the schema to Pulsar. Then Pulsar uses the schema to validate written messages.

Data Format tab
The schema specified on the Data Format tab supports message processing.

If you configure the destination to write messages in Avro or XML format, then you can specify a schema on the Data Format tab. The destination uses the specified schema to format and write messages.

If you specify a schema on both tabs, then specify the same schema.

Security

Configure the Pulsar Producer destination to use the security features available in the Pulsar cluster.

The destination supports the following security features:

TLS transport encryption
The Pulsar cluster encrypts all traffic between the Pulsar server and the stage. The Pulsar server provides a key and certificate, which the stage uses to verify the server's identity. For details, see the Pulsar documentation on TLS transport encryption.
TLS authentication
The stage provides keys and certificates, which the Pulsar server uses to verify the stage's identity. TLS authentication requires TLS transport encryption. For details, see the Pulsar documentation on TLS authentication.
JWT authentication
The stage provides Pulsar a JSON Web Token (JWT), which identifies the stage and grants permission for some actions. JWT authentication requires TLS transport encryption. For details, see the Pulsar documentation on JWT authentication.
OAuth authentication
The stage provides Pulsar an OAuth 2.0 access token, which identifies the stage and associates the stage with a role. For details, see the Pulsar documentation on OAuth authentication.

Enabling TLS Transport Encryption

Enable TLS transport encryption to encrypt all traffic between the Pulsar server and the stage.

  1. On the Pulsar tab of the stage, set the Pulsar URL property to the secure URL for the broker service.
    Use the following format for the URL:
    pulsar+ssl://<host name>:<broker service TLS port>/
    For example:
    pulsar+ssl://pulsar.us-west.example.com:6651/
  2. On the Security tab of the stage, select Enable TLS.
  3. Store the PEM file that contains the certificate authority (CA) that signed the Pulsar cluster certificate in the Data Collector resources directory, $SDC_RESOURCES.
    For information about creating certificates for the Pulsar cluster, see the Pulsar documentation.
  4. On the Security tab of the stage, enter the name of the CA certificate PEM file in the CA Certificate PEM property.

Enabling TLS Authentication

Enable TLS authentication so that Pulsar can authenticate the stage with certificates.

  1. Enable TLS transport encryption.
  2. On the Security tab of the stage, select Enable Mutual Authentication.
  3. Create the client certificate and client private key PEM files for the stage to use.
    For information about creating client certificates for Pulsar, see the Pulsar documentation.
  4. Store the client certificate and client private key PEM files created for the stage in the Data Collector resources directory, $SDC_RESOURCES.
  5. On the Security tab of the stage, enter the name of the client files in the Client Certificate PEM and Client Key PEM properties.

Enabling JWT Authentication

Enable JWT authentication so that Pulsar can authenticate the stage with a JSON Web Token (JWT).

Before you start, contact the Pulsar administrator for a token string that represents a signed JWT for the stage.
  1. Enable TLS transport encryption.
  2. On the Security tab, select Use JWT.
  3. In the Token property, enter the token string that represents a signed JWT for the stage.

Enabling OAuth Authentication

Enable OAuth authentication so that Pulsar can authenticate the stage with an access token.

  1. Create a JSON credentials file and store in the Data Collector resources directory, $SDC_RESOURCES, or in a relative directory.
    The credentials file contains service account credentials for authenticating the stage. For example, you might create a credentials file named credentials_file.json:
    {
      "type": "client_credentials",
      "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
      "client_secret": "on1uJ...k6F6R",
      "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
      "issuer_url": "https://accounts.google.com"
    }
  2. On the Security tab, select Use OAuth.
  3. In the Issuer URL property, enter the URL of the authentication provider that provides Pulsar clients with access tokens.
  4. In the Credentials URL property, enter the path of the JSON credentials file, relative to the Data Collector resources directory, $SDC_RESOURCES.
    For example, if you created the credentials_file.json file in the $SDC_RESOURCES directory, enter /credentials_file.json.
  5. In the Audience property, enter the URL that the stage uses to reach the Pulsar cluster.

Data Formats

The Pulsar Producer destination writes data to Pulsar based on the data format that you select. You can use the following data formats:

Avro
The stage writes records based on the Avro schema. You can use one of the following methods to specify the location of the Avro schema definition:
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • In Record Header - Use the schema included in the avroSchema record header attribute.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the destination to look up the schema in the Confluent Schema Registry by the schema ID or subject.

    If using the Avro schema in the stage or in the record header attribute, you can optionally configure the stage to register the Avro schema with the Confluent Schema Registry. You can also optionally include the schema definition in the message. Omitting the schema definition can improve performance, but requires the appropriate schema management to avoid losing track of the schema associated with the data.

You can include the Avro schema in the output.
You can also compress data with an Avro-supported compression codec. When using Avro compression, avoid configuring any other compression properties in the stage.
Binary
The stage writes binary data to a single field in the record.
Delimited
The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
You can use the following delimited format types:
  • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
  • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
  • MS Excel CSV - Microsoft Excel comma-separated file.
  • MySQL CSV - MySQL comma-separated file.
  • Tab-Separated Values - File that includes tab-separated values.
  • PostgreSQL CSV - PostgreSQL comma-separated file.
  • PostgreSQL Text - PostgreSQL text file.
  • Custom - File that uses user-defined delimiter, escape, and quote characters.
  • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
JSON
The destination writes records as JSON data. You can use one of the following formats:
  • Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
  • Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
Protobuf
Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
The destination writes records in the SDC Record data format.
Text
The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
You can configure the characters to use as record separators. By default, the destination uses a UNIX-style line ending (\n) to separate records.
When a record does not contain the selected text field, the destination can report the missing field as an error or to ignore the missing field. By default, the destination reports an error.
When configured to ignore a missing text field, the destination can discard the record or write the record separator characters to create an empty line for the record. By default, the destination discards the record.
XML
The destination creates a valid XML document for each record. The destination requires the record to have a single root field that contains the rest of the record data. For details and suggestions for how to accomplish this, see Record Structure Requirement.

The destination can include indentation to produce human-readable documents. It can also validate that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination.

Configuring a Pulsar Producer

Configure a Pulsar Producer destination to write data to Pulsar topics.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error
    Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Pulsar tab, configure the following properties:
    Pulsar Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    Pulsar URL URL to the Pulsar web service or broker service.
    If the Pulsar cluster is not enabled for TLS, enter either the web service or broker service URL in the following format:
    • Web service URL - http://<host name>:<web service port>. For example: http://pulsar.us-west.example.com:8080.
    • Broker service URL - pulsar://<host name>:<broker service port>. For example: pulsar://pulsar.us-west.example.com:6650
    If the Pulsar cluster is enabled for TLS, enter the secure broker service URL in the following format:
    pulsar+ssl://<host name>:<broker service TLS port>

    For example: pulsar+ssl://pulsar.us-west.example.com:6651

    Topic Name of the topic to publish messages to. Enter the topic name in the following format:
    {persistent|non-persistent}://<tenant>/<namespace>/<topic name>
    For example, to publish to a persistent topic named my-sdc-topic in the my-namespace namespace within the my-tenant tenant, enter the following as the topic name:
    persistent://my-tenant/my-namespace/my-sdc-topic
    If you enter a topic name only, then Pulsar uses the default persistent://public/default/ location. For example, to publish to a persistent topic belonging to the public tenant in the default namespace, simply enter the topic name as follows:
    my-sdc-topic

    If the specified topic does not exist, Pulsar creates the topic when the pipeline starts.

    You can use expressions to define the topic name. For example, if the my-topic field in the record contains the topic name, enter the following as the topic name:
    persistent://my-tenant/my-namespace/${record:value("/my-topic")}
    Keep Alive Interval (ms) Number of milliseconds to allow the connection to Pulsar to remain idle. After the destination publishes no messages for this amount of time, the connection is closed. The destination must reconnect to Pulsar.

    Default is 30,000 milliseconds.

    Operation Timeout (ms) Number of milliseconds to allow the Pulsar Producer-create operation to complete before marking the operation as failed.

    Default is 30,000 milliseconds.

  3. On the Schema tab, configure the following properties:
    Schema Property Description
    Schema Schema that Pulsar uses to validate the messages that the destination writes to a topic. The destination passes the schema to Pulsar when attaching to a topic. Select one of the following options:
    • No Schema - Use no schema. Select this option when the Pulsar namespace of the topic is not configured to enforce schema validation.
    • Auto Schema - Use a generic schema created by Pulsar to validate messages.
    • User-defined Schema - Use the schema specified in the Schema Info property. Valid for the Avro and Text data formats.
      Note: If the destination writes to multiple topics, Pulsar uses this same schema to validate the messages sent to any of the topics.
    • Schema in Record Headers - Use the schema from the record header attribute specified in the Header Attribute property. Valid for the Avro data format.
    Schema Info Schema definition to pass to Pulsar. Specify the schema using the SchemaInfo data structure, as described in the Pulsar documentation.

    Available if Schema is set to User-defined Schema.

    Important: The destination only supports Avro and string schemas. You must specify a corresponding data format on the Data Format tab, either Avro or Text. For Avro, specify the same schema on the Data Format tab.
    Header Attribute Record header attribute that contains the schema to pass to Pulsar. Default value is avroSchema. You can use the Schema Generator processor to generate the attribute.

    Available if Schema is set to Schema in Record Headers.

    Important: This option only supports an Avro schema. You must specify the Avro data format and the same schema on the Data Format tab.
  4. To enable security, click the Security tab and configure the following properties:
    Security Property Description
    Enable TLS Enables the stage to connect securely to Pulsar through TLS encryption.
    Use JWT Enables the stage to use a JSON Web Token (JWT) to authenticate with Pulsar.

    Available if TLS encryption is enabled.

    Token If JWT is enabled, JWT to pass to Pulsar.
    Enable Mutual Authentication Enables the stage to use a key and certificate to authenticate with Pulsar.

    Available if TLS encryption is enabled.

    CA Certificate PEM Path to the PEM file containing the certificate authority (CA) that signed the Pulsar cluster certificate.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Client Certificate PEM If mutual authentication is enabled, path to the PEM file containing the client certificate created for Data Collector.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Client Key PEM If mutual authentication is enabled, path to the PEM file containing the client private key created for Data Collector.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Use OAuth Enables the stage to use an OAuth 2 access token to authenticate with Pulsar.
    Issuer URL If OAuth is enabled, URL of the authentication provider that provides Pulsar clients with access tokens.
    Credentials URL If OAuth is enabled, path to the JSON credentials file.

    Enter a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Audience If OAuth is enabled, URI of the resource server that the stage wants to access.
  5. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Partition Type Partition type to use when publishing messages to a topic:
    • Single
    • Round Robin

    Default is Single.

    Hashing Scheme Hashing scheme to use when selecting which partition to write messages to.
    Message Key Message key used to compute the hash for partitioning. Enter the key or enter an expression that evaluates to the key.
    Compression Type Type of compression to apply to the published messages:
    • None
    • LZ4
    • ZLIB

    Default is None.

    Async Send Enables the destination to publish messages asynchronously. Clear to publish messages synchronously.

    For more information about the available send modes, see the Apache Pulsar documentation.

    Default is enabled.

    Max Pending Messages When sending messages asynchronously, the maximum number of messages that can wait in the queue for an acknowledgement from the Pulsar broker.

    Default is 1,000.

    Enable Batching When sending messages asynchronously, enables sending a batch of messages in a single request. Clear to send a single message in each request.

    Default is enabled.

    Max Batch Size (messages) When sending messages asynchronously and batching is enabled, the maximum number of messages to include in a batch.

    Default is 2,000.

    Batch Max Publish Latency (ms) When sending messages asynchronously and batching is enabled, the maximum number of milliseconds to wait before sending the next batch.

    Default is 1,000 milliseconds.

    Pulsar Configuration Properties

    Additional Pulsar configuration properties to use. Using simple or bulk edit mode, click the Add icon to add properties. Define the Pulsar property name and value.

    Use the property names and values as expected by Pulsar.

  6. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Type of data to be read. Use one of the following options:
    • Avro
    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text
    • XML
  7. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when writing data:
    • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
    • In Record Header - Use the schema in the avroSchema record header attribute. Use only when the avroSchema attribute is defined for all records.
    • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.
    Avro Schema Avro schema definition used to write the data.

    You can optionally use the runtime:loadResource function to load a schema definition stored in a runtime resource file.

    Register Schema Registers a new Avro schema with Confluent Schema Registry.
    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema or to register a new schema. To add a URL, click Add and then enter the URL in the following format:
    http://<host name>:<port number>
    Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication.

    Enter the key and secret from the schema.registry.basic.auth.user.info setting in Schema Registry using the following format:

    <key>:<secret>
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
    Look Up Schema By Method used to look up the schema in Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    Schema Subject Avro schema subject to look up or to register in Confluent Schema Registry.

    If the specified subject to look up has multiple schema versions, the stage uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in Confluent Schema Registry.
    Include Schema Includes the schema in each message.
    Note: Omitting the schema definition can improve performance, but requires the appropriate schema management to avoid losing track of the schema associated with the data.
    Avro Compression Codec The Avro compression type to use.

    When using Avro compression, do not enable other compression available in the destination.

  8. For binary data, on the Data Format tab, configure the following property:
    Binary Property Description
    Binary Field Path Field that contains the binary data.
  9. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Delimiter Format Format for delimited data:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma-separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • PostgreSQL CSV - PostgreSQL comma-separated file.
    • PostgreSQL Text - PostgreSQL text file.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    Header Line Indicates whether to create a header line.
    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Record Separator String Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    Available when using a custom delimiter format.

    Escape Character Escape character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the backslash character ( \ ).

    Quote Character Quote character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the quotation mark character ( " ).

    Replace New Line Characters Replaces new line characters with the configured string.

    Recommended when writing data as a single line of text.

    New Line Character Replacement String to replace each new line character. For example, enter a space to replace each new line character with a space.

    Leave empty to remove the new line characters.

    Charset Character set to use when writing data.
  10. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    JSON Content Method to write JSON data:
    • JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
    • Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
    Charset Character set to use when writing data.
  11. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation. For information about generating the descriptor file, see Protobuf Data Format Prerequisites.

    Message Type Fully-qualified name for the message type to use when writing data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
  12. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field.
    Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    By default, the destination uses \n.

    On Missing Field When a record does not include the text field, determines whether the destination reports the missing field as an error or ignores the missing field.
    Insert Record Separator if No Text When configured to ignore a missing text field, inserts the configured record separator string to create an empty line.

    When not selected, discards records without the text field.

    Charset Character set to use when writing data.
  13. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Pretty Format Adds indentation to make the resulting XML document easier to read. Increases the record size accordingly.
    Validate Schema Validates that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination.
    Important: Regardless of whether you validate the XML schema, the destination requires the record in a specific format. For more information, see Record Structure Requirement.
    XML Schema The XML schema to use to validate records.