Kinesis Producer
The Kinesis Producer destination writes data to Amazon Kinesis Streams. The destination can also send responses to a microservice origin when used in a microservice pipeline. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
To write data to an Amazon Kinesis Firehose delivery system, use the Kinesis Firehose destination. To write data to Amazon S3, use the Amazon S3 destination.
When you configure Kinesis Producer, you specify the Amazon Web Services connection information for your Kinesis cluster and the data format to use. You can also configure any additional Kinesis configuration properties that you require.
You can also use a connection to configure the destination.
When you want the destination to send responses to a microservice origin within a microservice pipeline, you specify the type of response to send.
- Writes a maximum of 500 messages and 4.5 MB in a batch.
- Allows a maximum of 50 kb/message. Larger messages are sent to the stage for error handling.
Authentication Method
You can configure the Kinesis Producer destination to authenticate with Amazon Web Services (AWS) using an instance profile or AWS access keys.
For more information about the authentication methods and details on how to configure each method, see Security in Amazon Stages.
Additional Kinesis Properties
You can add custom Kinesis configuration properties to the Kinesis Producer destination.
When you add a Kinesis configuration property, enter the exact property name and the value. The destination does not validate the property names or values.
Send Microservice Responses
The Kinesis Producer destination can send responses to a microservice origin when you use the destination in a microservice pipeline.
- All successfully written records.
- Responses from the destination system - For information about the possible responses, see the documentation for the destination system.
Data Formats
The Kinesis Producer destination writes data to Kinesis based on the data format that you select.
The Kinesis Producer destination processes data formats as follows:
- Avro
- The stage writes records based on the Avro schema. You can use one of the following methods to specify the location of the Avro schema definition:
- Binary
- The stage writes binary data to a single field in the record.
- Delimited
- The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
- JSON
- The destination writes records as JSON data. You can use one of
the following formats:
- Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
- Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
- Protobuf
- Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
- SDC Record
- The destination writes records in the SDC Record data format.
- Text
- The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
Configuring a Kinesis Producer Destination
Configure a Kinesis Producer destination to write data to Amazon Kinesis Streams.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Required Fields Fields that must include data for the record to be passed into the stage. Tip: You might include fields that the stage uses.Records that do not include all required fields are processed based on the error handling configured for the pipeline.
Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions. Records that do not meet all preconditions are processed based on the error handling configured for the stage.
On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the Kinesis tab, configure the following
properties:
Kinesis Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
Authentication Method Authentication method used to connect to Amazon Web Services (AWS): - AWS Keys - Authenticates using an AWS access key pair.
- Instance Profile - Authenticates using an instance profile associated with the Data Collector EC2 instance.
Access Key ID AWS access key ID. Required when using AWS keys to authenticate with AWS. Secret Access Key AWS secret access key. Required when using AWS keys to authenticate with AWS. Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.Assume Role Temporarily assumes another role to authenticate with AWS. Role ARN Amazon resource name (ARN) of the role to assume, entered in the following format:
arn:aws:iam::<account_id>:role/<role_name>
Where
<account_id>
is the ID of your AWS account and<role_name>
is the name of the role to assume. You must create and attach an IAM trust policy to this role that allows the role to be assumed.Available when assuming another role.
Role Session Name Optional name for the session created by assuming a role. Overrides the default unique identifier for the session.
Available when assuming another role.
Session Timeout Maximum number of seconds for each session created by assuming a role. The session is refreshed if the pipeline continues to run for longer than this amount of time.
Set to a value between 3,600 seconds and 43,200 seconds.
Available when assuming another role.
Set Session Tags Sets a session tag to record the name of the currently logged in StreamSets user that starts the pipeline or the job for the pipeline. AWS IAM verifies that the user account set in the session tag can assume the specified role.
Select only when the IAM trust policy attached to the role to be assumed uses session tags and restricts the session tag values to specific user accounts.
When cleared, the connection does not set a session tag.
Available when assuming another role.
Region AWS region to connect to. Select one of the available regions. To specify an endpoint to connect to, select Other. Endpoint Endpoint to connect to when you select Other for the region. Enter the endpoint name. Stream Name Kinesis stream name. Kinesis Producer Configuration Additional Kinesis properties to use. Using simple or bulk edit mode, click the Add icon to add properties. Define the Kinesis property name and value. When you add a configuration property, enter the exact property name and the value. The stage does not validate the property names or values.
Partitioning Strategy Strategy to write data to Kinesis shards: - Random - Generates a random partition key.
-
Expression - Uses the result of an expression as the partition key.
Partition Expression Expression to generate the partition key used to pass data to different shards. Use for the expression partition strategy.
Preserve Record Order Select to preserve the order of records. Enabling this option can reduce pipeline performance. -
On the Data Format tab, configure the following
property:
Data Format Property Description Data Format Format of data to be written. Use one of the following data formats: - Avro
- Binary
- Delimited
- JSON
- Protobuf
- SDC Record
- Text
-
For Avro data, on the Data Format tab, configure the
following properties:
Avro Property Description Avro Schema Location Location of the Avro schema definition to use when writing data: - In Pipeline Configuration - Use the schema that you provide in the stage configuration.
- In Record Header - Use the schema in the avroSchema record header attribute. Use only when the avroSchema attribute is defined for all records.
- Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.
Avro Schema Avro schema definition used to write the data. You can optionally use the
runtime:loadResource
function to load a schema definition stored in a runtime resource file.Register Schema Registers a new Avro schema with Confluent Schema Registry. Schema Registry URLs Confluent Schema Registry URLs used to look up the schema or to register a new schema. To add a URL, click Add and then enter the URL in the following format: http://<host name>:<port number>
Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication. Enter the key and secret from the
schema.registry.basic.auth.user.info
setting in Schema Registry using the following format:<key>:<secret>
Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.Look Up Schema By Method used to look up the schema in Confluent Schema Registry: - Subject - Look up the specified Avro schema subject.
- Schema ID - Look up the specified Avro schema ID.
Schema Subject Avro schema subject to look up or to register in Confluent Schema Registry. If the specified subject to look up has multiple schema versions, the stage uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.
Schema ID Avro schema ID to look up in Confluent Schema Registry. Include Schema Includes the schema in each message. Note: Omitting the schema definition can improve performance, but requires the appropriate schema management to avoid losing track of the schema associated with the data.Avro Compression Codec The Avro compression type to use. When using Avro compression, do not enable other compression available in the destination.
-
For binary data, on the Data Format tab, configure the
following property:
Binary Property Description Binary Field Path Field that contains the binary data. -
For delimited data, on the Data Format tab, configure the
following properties:
Delimited Property Description Delimiter Format Format for delimited data: - Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
Header Line Indicates whether to create a header line. Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character. You can enter a Unicode control character using the format \uNNNN, where N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.
Default is the pipe character ( | ).
Record Separator String Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records. Available when using a custom delimiter format.
Escape Character Escape character for a custom delimiter format. Select one of the available options or use Other to enter a custom character. Default is the backslash character ( \ ).
Quote Character Quote character for a custom delimiter format. Select one of the available options or use Other to enter a custom character. Default is the quotation mark character ( " ).
Replace New Line Characters Replaces new line characters with the configured string. Recommended when writing data as a single line of text.
New Line Character Replacement String to replace each new line character. For example, enter a space to replace each new line character with a space. Leave empty to remove the new line characters.
Charset Character set to use when writing data. -
For JSON data, on the Data Format tab, configure the
following properties:
JSON Property Description JSON Content Method to write JSON data: - JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
- Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
Charset Character set to use when writing data. -
For protobuf data, on the Data Format tab, configure the
following properties:
Protobuf Property Description Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES
.For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation. For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
Message Type Fully-qualified name for the message type to use when writing data. Use the following format:
Use a message type defined in the descriptor file.<package name>.<message type>
. -
For text data, on the Data Format tab, configure the
following properties:
Text Property Description Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field. Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records. By default, the destination uses \n.
On Missing Field When a record does not include the text field, determines whether the destination reports the missing field as an error or ignores the missing field. Insert Record Separator if No Text When configured to ignore a missing text field, inserts the configured record separator string to create an empty line. When not selected, discards records without the text field.
Charset Character set to use when writing data. -
When using the destination in a microservice pipeline, on the
Response tab, configure the following properties. In
non-microservice pipelines, these properties are ignored.
Response Property Description Send Response to Origin Enables sending a response to a microservice origin. Response Type The response to send to a microservice origin: - Successfully written records.
- Responses from the destination system.