Configuring a Pipeline
Configure a pipeline to define the stream of data. After you configure the pipeline, you can start the pipeline.
- A single origin stage
- Multiple processor stages
- Multiple destination stages
- Multiple executor stages
- Multiple pipeline fragments
-
You can create a new pipeline in one of the following ways:
- In the top toolbar, click .
- In the Navigation panel, click Add icon. , and then click the
- Enter a pipeline name and optional description.
-
Use the defaults to create a blank Data Collector pipeline, and then
click Next.
By default, Control Hub selects an accessible authoring Data Collector that you have read permission on and that has the most recent reported time. To select another Data Collector, click Click here to select.
-
Click Save & Open in Canvas.
A blank pipeline opens in the canvas.
-
In the Properties panel, on the General tab, configure the
following properties:
Pipeline Property Description Name Optionally edit the name of the pipeline. Description Optionally edit or add a description of the pipeline. Labels Optional labels to assign to the pipeline. Use labels to group similar pipelines. For example, you might want to group pipelines by database schema or by the test or production environment.
Assign the label templates to pipelines that you want to use as user-defined sample pipelines.
You can use nested labels to create a hierarchy of pipeline groupings. Enter nested labels using the following format:
For example, to group pipelines in the test environment by the origin system, you might add the labels<label1>/<label2>/<label3>
Test/HDFS
andTest/Elasticsearch
to the appropriate pipelines.Execution Mode Execution mode of the pipeline: - Standalone - A single Data Collector process runs the pipeline.
Delivery Guarantee Determines how Data Collector handles data after an unexpected event causes the pipeline to stop running: - At Least Once - Ensures all data is processed and written to the destination. Might result in duplicate rows.
- At Most Once - Ensures that data is not reprocessed to prevent writing duplicate data to the destination. Might result in missing rows.
Default is At Least Once.
Test Origin A virtual origin to provide test data for data preview. Only used when the Test Origin option is selected in the Preview Configuration dialog box. To enable the use of a test origin, select the origin to access the test data, then configure the origin properties on the Test Origin tab. You can use any available origin.
Default is the Dev Raw Data Source origin.
Start Event Determines how the start event is handled. Select one of the following options: - Discard - Use when you don't want to use the event.
- An executor - To use the event to trigger a task, select the executor that you want to use. For more information about the executors, see Executors.
Use in standalone Data Collector pipelines only.
For more information about pipeline events, see Pipeline Event Generation.
Stop Event Determines how the stop event is handled. Select one of the following options: - Discard - Use when you don't want to use the event.
- An executor - To use the event to trigger a task, select the executor that you want to use. For more information about the executors, see Executors.
Use in standalone Data Collector pipelines only.
For more information about pipeline events, see Pipeline Event Generation.
Retry Pipeline on Error Retries the pipeline upon error. Retry Attempts Number of retries attempted. Use -1 to retry indefinitely. The wait time between retries starts at 15 seconds and doubles until reaching five minutes.
Rate Limit (records / sec) Maximum number of records that the pipeline can read in a second. Use 0 or no value to set no rate limit. Default is 0.
Max Runners The maximum number of pipeline runners to use in a multithreaded pipeline. Use 0 for no limit. When set to 0, Data Collector generates up to the maximum number of threads or concurrency configured in the origin.
You can use this property to help tune pipeline performance. For more information, see Tuning Threads and Runners.
Default is 0.
Runner Idle Time (sec) Minimum number of seconds a pipeline runner waits when idle before generating an empty batch. The number of empty batches that are generated by pipeline runners displays as the Idle Batch Count in the monitor mode runtime statistics.
Use to ensure that batches are generated periodically, even when no data needs to be processed.
Use -1 to allow pipeline runners to wait indefinitely when idle without generating empty batches.
For standalone pipelines only.
Create Failure Snapshot Automatically creates a snapshot if the pipeline fails because of data-related errors. Can be used to troubleshoot the pipeline. -
To define runtime parameters, on the Parameters tab, click
the Add icon and define the name and the default value
for each parameter. You can use simple or bulk edit mode to add the parameters.
For more information, see Runtime Parameters.
-
To configure notifications based on changes in pipeline state, on the
Notifications tab, configure the following
properties:
Notifications Property Description Notify on Pipeline State Changes Sends notifications when the pipeline encounters the listed pipeline states. Email IDs Email addresses to receive notification when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional addresses. Error Information Level Amount of information included in an email notification triggered by an error: - All error details
- Only the error code
- Error notification with no details
Note: Error details can include sensitive information.Webhooks Webhook to send when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional webhooks. Webhook URL URL to send the HTTP request. Headers Optional HTTP request headers. HTTP Method HTTP method. Use one of the following methods: - GET
- PUT
- POST
- DELETE
- HEAD
Payload Optional payload to use. Available for PUT, POST, and DELETE methods. Use any valid content type.
You can use webhook parameters in the payload to include information about the triggering event, such as the pipeline name or state. Enclose webhook parameters in double curly brackets as follows:
{{PIPELINE_STATE}}
.Content Type Optional content type of the payload. Configure this property when the content type is not declared in the request headers. Authentication Type Optional authentication type to include in the request. Use None, Basic, Digest, or Universal. Use Basic for Form authentication.
User Name User name to include when using authentication. Password Password to include when using authentication. -
Click the Error Records tab and configure the following
error handling options:
Error Records Property Description Error Records Determines how to handle records that cannot be processed as expected. Use one of the following options:
- Discard - Discards error records.
- Send Response to Origin - Passes error records back to the microservice origin to be included in a response to the originating REST API client. Use in microservice pipelines only.
- Write to Amazon S3 - Writes error records to Amazon S3.
- Write to Azure Event Hub - Writes error records to the specified Microsoft Azure Event Hub.
- Write to Elasticsearch - Writes error records to the specified Elasticsearch cluster.
- Write to File - Writes error records to a file in the specified directory.
- Write to Google Cloud Storage - Writes error records to Google Cloud Storage.
- Write to Google Pub/Sub - Writes error records to Google Pub/Sub.
- Write to Kafka - Writes error records to the specified Kafka cluster.
- Write to Kinesis - Writes error records to the specified Kinesis stream.
- Write to MapR Streams - Writes error records to the specified MapR Streams cluster.
- Write to MQTT - Writes error records to the specified MQTT broker.
Error Record Policy Determines the version of the record to use as a basis for an error record. For more information, see Error Records and Version. -
When writing errors to Send Response to Origin, optionally click the
Error Records - Send Response to Origin tab and
configure the following property:
Send Response to Origin Property Description Status Code HTTP status code for the error records. Default is 500, representing an internal server error. All error records are included in the response as error records.
-
When writing error records to Amazon S3, click the Error Records -
Write to Amazon S3 tab and configure the following
properties:
Amazon S3 Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Authentication Method Authentication method used to connect to Amazon Web Services (AWS): - AWS Keys - Authenticates using an AWS access key pair.
- Instance Profile - Authenticates using an instance profile associated with the Data Collector EC2 instance.
- None - Connects to a public bucket using no authentication.
Access Key ID AWS access key ID. Required when using AWS keys to authenticate with AWS. Secret Access Key AWS secret access key. Required when using AWS keys to authenticate with AWS. Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores.Assume Role Temporarily assumes another role to authenticate with AWS. Role ARN Amazon resource name (ARN) of the role to assume, entered in the following format:
arn:aws:iam::<account_id>:role/<role_name>
Where
<account_id>
is the ID of your AWS account and<role_name>
is the name of the role to assume. You must create and attach an IAM trust policy to this role that allows the role to be assumed.Available when assuming another role.
Role Session Name Optional name for the session created by assuming a role. Overrides the default unique identifier for the session.
Available when assuming another role.
Session Timeout Maximum number of seconds for each session created by assuming a role. The session is refreshed if the pipeline continues to run for longer than this amount of time.
Set to a value between 3,600 seconds and 43,200 seconds.
Available when assuming another role.
Set Session Tags Sets a session tag to record the name of the currently logged in IBM StreamSets user that starts the pipeline or the job for the pipeline. AWS IAM verifies that the user account set in the session tag can assume the specified role.
Select only when the IAM trust policy attached to the role to be assumed uses session tags and restricts the session tag values to specific user accounts.
When cleared, the connection does not set a session tag.
Available when assuming another role.
Use Specific Region Specify the AWS region or endpoint to connect to. When cleared, the stage uses the Amazon S3 default global endpoint, s3.amazonaws.com.
Region AWS region to connect to. Select one of the available regions. To specify an endpoint to connect to, select Other. Endpoint Endpoint to connect to when you select Other for the region. Enter the endpoint name. Use Custom Endpoint Specify a specific signing region when connecting to a custom endpoint. When cleared, the stage uses the region specified in the endpoint.
Signing Region AWS region used by the custom endpoint. Bucket Bucket to use when writing records. Enter a bucket name or define an expression that evaluates to bucket names.
When using datetime variables in the expression, be sure to configure the time basis for the stage.
Common Prefix Common prefix that determines where objects are written. Partition Prefix Optional partition prefix to specify the partition to use. Use a specific partition prefix or define an expression that evaluates to a partition prefix.
When using datetime variables in the expression, be sure to configure the time basis for the stage.
Object Name Suffix Suffix to use for object names, such as txt or json. When used, the destination adds a period and the configured suffix as follows: <object name>.<suffix>. You can include periods within the suffix, but do not start the suffix with a period. Forward slashes are not allowed.
Not available for the whole file data format.
Use Server-Side Encryption Enables server-side encryption. Server-Side Encryption Option Option that Amazon S3 uses to manage the encryption keys: - SSE-S3 - Use Amazon S3-managed keys.
- SSE-KMS - Use Amazon Web Services KMS-managed keys.
- SSE-C - Use customer-provided keys.
Default is SSE-S3.
AWS KMS Key ARN Amazon resource name (ARN) of the AWS KMS master encryption key. Use the following format: <arn>:<aws>:<kms>:<region>:<acct ID>:<key>/<key ID>
Used for SSE-KMS encryption only.
Object Ownership Determines ownership of the objects written to the bucket:- Default - Objects are written with the bucket owner enforced setting. Access control lists (ACLs) are disabled on the objects, and the bucket owner automatically owns and has full control over the objects. The bucket uses policies to define access control instead of ACLs.
- Bucket Owner Full Control - Objects are written with the bucket owner
preferred setting. ACLs are enabled on the object, and the object is assigned
the
bucket-owner-full-control
canned ACL.
Amazon S3 recommends using the default ownership to keep ACLs disabled except in unusual circumstances where you must control access for each object individually. For more information, see the Amazon S3 documentation.
Delimiter Delimiter used by Amazon S3 to define the prefix hierarchy. Default is slash ( / ).
Encryption Context Key-value pairs to use for the encryption context. Click Add to add key-value pairs. Used for SSE-KMS encryption only.
Customer Encryption Key The 256-bit and Base64 encoded encryption key to use. Used for SSE-C encryption only.
Customer Encryption Key MD5 The 128-bit and Base64 encoded MD5 digest of the encryption key according to RFC 1321. Used for SSE-C encryption only.
Data Time Zone Time zone for the destination system. Used with the time basis to resolve datetimes in a time-based bucket or partition prefix.
Time Basis Time basis to use for writing to a time-based bucket or partition prefix. Use one of the following expressions:${time:now()}
- Uses the processing time as the time basis in conjunction with the specified Data Time Zone.- An expression that calls a field and resolves to a datetime value, such as
${record:value(<date field path>)}
. Uses the time associated with the record as the time basis, adjusted for the specified Data Time Zone.
When the Bucket and Partition Prefix properties have no time component, you can ignore this property.
Default is
${time:now()}
.Object Name Prefix Defines a prefix for object names written by the destination. By default, object names start with sdc
as follows:sdc-<UTC timestamp>-<counter>
.Compress with Gzip Compresses files with gzip before writing to Amazon S3. Add Tags Enables adding tags to the Amazon S3 objects that are created. Tags Tags to add to an object. Using simple or bulk edit mode, click Add Another to configure a tag. You can configure multiple tags. When you configure a tag, you can define a tag with just the key or specify a key and value.
Connection Timeout Seconds to wait for a response before closing the connection. Socket Timeout Seconds to wait for a response to a query. Retry Count Maximum number of times to retry requests. Use Proxy Specifies whether to use a proxy to connect. Proxy Host Proxy host. Proxy Port Proxy port. Proxy User User name for proxy credentials. Proxy Password Password for proxy credentials. Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Proxy Domain Optional domain name for the proxy server. Proxy Workstation Optional workstation for the proxy server. Thread Pool Size for Parallel Uploads Size of the thread pool for parallel uploads. Used when working with multiple partitions and processing large objects in multiple parts. When working with multiple partitions, setting this property up to the number of partitions being used to can improve performance.
For more information about this and the following properties, see the Amazon S3 TransferManager documentation.
Multipart Upload Threshold Minimum batch size in bytes for multipart uploads. Minimum Upload Part Size Minimum part size in bytes for multipart uploads. -
When writing error records to Microsoft Azure Event Hub, click the
Error Records - Write to Event Hub tab and configure
the following properties:
Event Hub Property Description Namespace Name The name of the namespace that contains the event hub that you want to use. Event Hub Name The event hub name. Shared Access Policy Name The policy name associated with the namespace. To retrieve the policy name, when logged into the Azure portal, navigate to your namespace and event hub, and then click Shared Access Policies for a list of policies.
When appropriate, you can use the default shared access key policy, RootManageSharedAccessKey.
Connection String Key One of the connection string keys associated with the specified shared access policy. To retrieve a connection string key, after accessing the list of shared access policies, click the policy name, and then copy the Connection String - Primary Key value.
The value typically begins with "Endpoint".
-
When writing error records to Elasticsearch, click the Error Records
- Write to Elasticsearch tab and configure the following
properties:
Elasticsearch Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
HTTP URLs Comma-separated list of HTTP or HTTPS URLs used to connect to each Elasticsearch server in the cluster. Use the following format: http://<host1>,http://<host2>
You can specify a port number in the URLs to override the default port defined in the HTTP Port property, as follows:
http://<host1>:<port>,http://<host2>:<port>
When a port number is defined in both this property and in the HTTP Port property, the port in this property takes precedence. For example, if you define this property as follows:
http://server1,http://server2:1234
And you define the default HTTP Port property as 9200, then server1 uses the default port of 9200 and server2 uses the port 1234.
HTTP Port Default port number to use for URLs that do not include a port. The default HTTP port is 9200. The default HTTPS port is 443.
Use Security Specifies whether security is enabled on the Elasticsearch cluster. Index Index for the generated documents. Enter an index name or an expression that evaluates to the index name. For example, if you enter
customer
as the index, the destination writes the document within thecustomer
index.If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables.
Mapping Mapping type for the generated documents. Valid values depend on the following Elasticsearch versions: - Elasticsearch 5.x - Enter the mapping type, an
expression that evaluates to the mapping type, or a
field that includes the mapping type. For example,
if you enter
user
as the mapping type, the destination writes the document with auser
mapping type. - Elasticsearch 6.x or 7.x - Enter {_doc} if Elasticsearch was upgraded from version 5.x. Otherwise, enter empty curly brackets as follows: {}.
- Elasticsearch 8.x - This property is ignored when used with Elasticsearch 8.0 or later, which no longer supports mapping types.
For more information about the removal of mapping types, see the Elasticsearch documentation.
Document ID Expression that evaluates to the ID for the generated documents. When you do not specify an ID, Elasticsearch creates an ID for each document. By default, the destination allows Elasticsearch to create the ID.
Additional HTTP Params Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch. Detect Additional Nodes in Cluster Detects additional nodes in the cluster based on the configured HTTP URLs. Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true.
Use only when Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters.
Time Basis Time basis to use for writing to time-based indexes. Use one of the following expressions: ${time:now()}
- Uses the processing time as the time basis. The processing time is the time associated with the Data Collector running the pipeline.- An expression that calls a field and resolves to a
datetime value, such as
${record:value(<date field path>)}
. Uses the datetime result as the time basis.
When the Index property does not include datetime variables, you can ignore this property.
Default is
${time:now()}
.Data Time Zone Time zone for the destination system. Used to resolve datetimes in time-based indexes. Parent ID Optional parent ID for the generated documents. Enter a parent ID or an expression that evaluates to the parent ID. Use to establish a parent-child relationship between documents in the same index.
Routing Optional custom routing value for the generated documents. Enter a routing value or an expression that evaluates to the routing value. Elasticsearch routes a document to a particular shard in an index based on the routing value defined for the document. You can define a custom value for each document. If you don’t define a custom routing value, Elasticsearch uses the parent ID (if defined) or the document ID as the routing value.
Data Charset Character encoding of the data to be processed.
Additional Properties Extra fields to include in the action statement. Specify in JSON format. For example, you can use the
_retry_on_conflict
field to specify how many times an update is retried when there is a version conflict. To specify three retries, include the following:"_retry_on_conflict" : 3
You can use record functions and delimited data record functions when configuring this property.
For more information, see the Elasticsearch documentation.
If you enabled security, configure the following security properties:Security Property Description Mode Authentication method to use: - Basic - Authenticate with Elasticsearch user name and password. Select this option for Elasticsearch clusters outside of Amazon OpenSearch Service.
- AWS Signature V4 - Authenticate with AWS. Select this option for Elasticsearch clusters within Amazon OpenSearch Service.
User Name Elasticsearch user name. Available when using Basic authentication.
Password Password for the user account. Available when using Basic authentication.
Region Amazon Web Services region that hosts the Elasticsearch domain. Available when using AWS Signature V4 authentication.
Access Key ID AWS access key ID. Required when not using instance profile credentials. Available when using AWS Signature V4 authentication.
Secret Access Key AWS secret access key. Required when not using instance profile credentials. Available when using AWS Signature V4 authentication.
Enable SSL Enables the use of SSL. SSL Truststore Path Location of the truststore file. Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property.
Not necessary for Elastic Cloud clusters.
SSL Truststore Password Password for the truststore file. Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property.
Not necessary for Elastic Cloud clusters.
- Elasticsearch 5.x - Enter the mapping type, an
expression that evaluates to the mapping type, or a
field that includes the mapping type. For example,
if you enter
-
When writing error records to file, click the Error Records - Write
to File tab and configure the following properties:
Write to File Property Description Directory Local directory for error record files. File Prefix Prefix used for error record files. Use to differentiate error record files from other files in the directory. Uses the prefix sdc-${sdc:id()} by default. The prefix evaluates to sdc-<Data Collector ID>. This provides default differentiation in case several Data Collectors write to the same directory.
The Data Collector ID is stored in the following file: $SDC_DATA/sdc.id file.
For more information about environment variables, see Java and Security Configuration.
File Wait Time (secs) Number of seconds Data Collector waits for error records. After that time, it creates a new error record file. You can enter a number of seconds or use the default expression to enter the time in minutes.
Max File Size (MB) Maximum size for error files. Exceeding this size creates a new error file. Use 0 to avoid using this property.
-
When writing error records to Google Cloud Storage, click the Error
Records - Write to Google Cloud Storage tab and configure the
following properties:
Google Cloud Storage Property Description Bucket Bucket to use when writing records. Note: The bucket name must be DNS compliant. For more information about bucket naming conventions, see the Google Cloud Storage documentation.Common Prefix Common prefix that determines where objects are written. Partition Prefix Optional partition prefix to specify the partition to use. Use a specific partition prefix or define an expression that evaluates to a partition prefix.
When using datetime variables in the expression, be sure to configure the time basis for the stage.
Data Time Zone Time zone for the destination system. Used to resolve datetimes in a time-based partition prefix.
Time Basis Time basis to use for writing to a time-based bucket or partition prefix. Use one of the following expressions:${time:now()}
- Uses the processing time as the time basis in conjunction with the specified Data Time Zone.- An expression that calls a field and resolves to
a datetime value, such as
${record:value(<date field path>)}
. Uses the time associated with the record as the time basis, adjusted for the specified Data Time Zone.
When the Partition Prefix property has no time component, you can ignore this property.
Default is
${time:now()}
.Object Name Prefix Defines a prefix for object names written by the destination. By default, object names start with "sdc" as follows: sdc-<UUID>
.Not required for the whole file data format.
Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Project ID Google Cloud project ID to use.
Credentials Provider Provider for Google Cloud credentials: - Default credentials provider - Uses Google Cloud default credentials.
- Service account credentials file (JSON) - Uses credentials stored in a JSON service account credentials file.
- Service account credentials (JSON) - Uses JSON-formatted credentials information from a service account credentials file.
Credentials File Path (JSON) Path to the Google Cloud service account credentials file used to connect. The credentials file must be a JSON file. Enter a path relative to the Data Collector resources directory,
$SDC_RESOURCES
, or enter an absolute path.Credentials File Content (JSON) Contents of a Google Cloud service account credentials JSON file used to connect. Enter JSON-formatted credential information in plain text, or use an expression to call the information from runtime resources or a credential store.
-
When writing error records to Google Pub/Sub, click the Error Records
- Write to Google Pub/Sub tab and configure the following
properties:
Google Pub/Sub Property Description Topic ID Google Pub/Sub topic ID to write messages to. Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Project ID Google Cloud project ID to use.
Credentials Provider Provider for Google Cloud credentials: - Default credentials provider - Uses Google Cloud default credentials.
- Service account credentials file (JSON) - Uses credentials stored in a JSON service account credentials file.
- Service account credentials (JSON) - Uses JSON-formatted credentials information from a service account credentials file.
Credentials File Path (JSON) Path to the Google Cloud service account credentials file used to connect. The credentials file must be a JSON file. Enter a path relative to the Data Collector resources directory,
$SDC_RESOURCES
, or enter an absolute path.Credentials File Content (JSON) Contents of a Google Cloud service account credentials JSON file used to connect. Enter JSON-formatted credential information in plain text, or use an expression to call the information from runtime resources or a credential store.
Request Bytes Threshold Size of accumulated messages that triggers sending messages as a batch. Specify in bytes. Default is 1000.
Messages Count Threshold Number of accumulated messages that triggers sending messages as a batch. Default is 100.
Default Delay Threshold (ms) Elapsed time since the arrival of the first message that triggers sending messages as a batch. Specify in milliseconds. Default is 1.
Batch Enabled Select to have the destination send messages in batches. When disabled, the destination writes each message individually, ignoring threshold properties. Max Outstanding Message Count Number of unprocessed messages that the destination stores in memory before taking an action to control the flow of messages. You might want to control the flow of messages when the destination can read messages faster than it can write messages. Set to 0 to never control the flow based on message count. To control the message flow when using batch processing, set to a number larger than the message count threshold.
Max Outstanding Request Bytes Number of unprocessed bytes that the destination stores in memory before taking an action to control the flow of messages. Set to 0 to never control the flow based on message size. To control the message flow when using batch processing, set to a number larger than the request bytes threshold.
Limit Exceeded Behavior Action to take when either the count or size of unprocessed messages exceeds the specified limit. Select one of the following options: - Throw Exception - Triggers pipeline error processing.
- Block - Stops processing new messages until stored messages have been successfully written.
- Ignore - Discards new messages until stored messages have been successfully written.
-
When writing error records to Kafka, click the Error Records - Write
to Kafka tab and configure the following properties:
Write to Kafka Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Broker URI Comma-separated list of connection strings for the Kafka brokers. Use the following format for each broker: <host>:<port>
.To ensure a pipeline can connect to Kafka in case a specified broker goes down, list as many brokers as possible.
Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record. Topic Topic to use. Not available when using runtime topic resolution.
Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name. Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling. Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.
Kafka Configuration Additional Kafka properties to use. Using simple or bulk edit mode, click the Add icon and define the Kafka property name and value. Use the property names and values as expected by Kafka. Do not use the broker.list property.
Security Option Authentication and encryption option used to connect to the Kafka brokers: - None (Security Protocol=PLAINTEXT) - Uses no authentication or encryption.
- SSL/TLS Encryption (Security Protocol=SSL)
- SSL/TLS Encryption and Authentication (Security Protocol=SSL)
- SASL Authentication (Security Protocol=SASL_PLAINTEXT)
- SASL Authentication on SSL/TLS (Security Protocol=SASL_SSL)
- Custom Authentication (Security Protocol=CUSTOM)
Enabling security requires completing several prerequisite tasks and configuring additional security properties, as described in Security in Kafka Stages.
Message Key Format Data format of the message key values to pass to Kafka. Ignore this property when not passing message key values to Kafka. For more information about working with Kafka message key values, see Kafka Message Keys.
Partition Strategy Strategy to use to write to partitions: - Round Robin - Takes turns writing to different partitions.
- Random - Writes to partitions randomly.
- Expression - Uses an expression to write data to
different partitions. Writes records to the
partitions specified by the results of the expression.Note: The expression results are written to a specified Kafka message key attribute, overwriting any existing values. Because this partition strategy uses Kafka message keys, you cannot use the Kafka Message Key property in the destination to pass other Kafka message keys to Kafka. If records already contain Kafka message keys that you want to pass to Kafka, use a different partition strategy.
- Default - Uses an expression to extract a partition key from the record. Writes records to partitions based on a hash of the partition key.
Partition Expression Expression to use with the default or expression partition strategy. When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value.
When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value.
Optionally, click Ctrl + Space Bar for help with creating the expression.
One Message per Batch For each batch, writes the records to each partition as a single message. Override Stage Configurations When configurations conflict, the properties configured in the Kafka Configuration property override other properties configured in the stage. Kafka Message Key Passes message key values stored in a record header attribute to Kafka as message keys. Enter an expression that specifies the attribute where the message keys are stored.
To pass string message keys stored in an attribute, use:${record:attribute('<message key attribute name>'}
To pass Avro message keys stored in an attribute, use:${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record: attribute('<messsage key attribute name')))}
For more information, about working with Kafka message keys, see Kafka Message Keys.
-
When writing error records to Kinesis, click the Error Records -
Write to Kinesis tab and configure the following
properties:
Kinesis Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Authentication Method Authentication method used to connect to Amazon Web Services (AWS): - AWS Keys - Authenticates using an AWS access key pair.
- Instance Profile - Authenticates using an instance profile associated with the Data Collector EC2 instance.
Access Key ID AWS access key ID. Required when using AWS keys to authenticate with AWS. Secret Access Key AWS secret access key. Required when using AWS keys to authenticate with AWS. Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores.Assume Role Temporarily assumes another role to authenticate with AWS. Role ARN Amazon resource name (ARN) of the role to assume, entered in the following format:
arn:aws:iam::<account_id>:role/<role_name>
Where
<account_id>
is the ID of your AWS account and<role_name>
is the name of the role to assume. You must create and attach an IAM trust policy to this role that allows the role to be assumed.Available when assuming another role.
Role Session Name Optional name for the session created by assuming a role. Overrides the default unique identifier for the session.
Available when assuming another role.
Session Timeout Maximum number of seconds for each session created by assuming a role. The session is refreshed if the pipeline continues to run for longer than this amount of time.
Set to a value between 3,600 seconds and 43,200 seconds.
Available when assuming another role.
Set Session Tags Sets a session tag to record the name of the currently logged in IBM StreamSets user that starts the pipeline or the job for the pipeline. AWS IAM verifies that the user account set in the session tag can assume the specified role.
Select only when the IAM trust policy attached to the role to be assumed uses session tags and restricts the session tag values to specific user accounts.
When cleared, the connection does not set a session tag.
Available when assuming another role.
Region AWS region to connect to. Select one of the available regions. To specify an endpoint to connect to, select Other. Endpoint Endpoint to connect to when you select Other for the region. Enter the endpoint name. Stream Name Kinesis stream name. Kinesis Producer Configuration Additional Kinesis properties to use. Using simple or bulk edit mode, click the Add icon to add properties. Define the Kinesis property name and value. When you add a configuration property, enter the exact property name and the value. The stage does not validate the property names or values.
Partitioning Strategy Strategy to write data to Kinesis shards: - Random - Generates a random partition key.
-
Expression - Uses the result of an expression as the partition key.
Partition Expression Expression to generate the partition key used to pass data to different shards. Use for the expression partition strategy.
Preserve Record Order Select to preserve the order of records. Enabling this option can reduce pipeline performance. -
When writing error records to a MapR Streams cluster, click the Error
Records - Write to MapR Streams tab and configure the following
properties:
MapR Streams Producer Property Description Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record. Topic Topic to use. Not available when using runtime topic resolution.
Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name. Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling. Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.
Partition Strategy Strategy to use to write to partitions: - Round Robin - Takes turns writing to different partitions.
- Random - Writes to partitions randomly.
- Expression - Uses an expression to write data to
different partitions. Writes records to the
partitions specified by the results of the
expression.Note: The expression results are written to a specified Kafka message key attribute, overwriting any existing values.
- Default - Uses an expression to extract a partition key from the record. Writes records to partitions based on a hash of the partition key.
Partition Expression Expression to use with the default or expression partition strategy. When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value.
When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value.
Optionally, click Ctrl + Space Bar for help with creating the expression.
One Message per Batch For each batch, writes the records to each partition as a single message. MapR Streams Configuration Additional configuration properties to use. Using simple or bulk edit mode, click the Add icon and define the MapR Streams property name and value. Use the property names and values as expected by MapR.
You can use MapR Streams properties and the set of Kafka properties supported by MapR Streams.
Override Stage Configurations When configurations conflict, the properties configured in the MapR Streams Configuration property override other properties configured in the stage. -
When writing error records to an MQTT broker, click the Error Records
- Write to MQTT tab and configure the following properties:
MQTT Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Broker URL MQTT Broker URL. Enter in the following format: <tcp | ssl>://<hostname>:<port>
Use ssl for secure connections to the broker.
For example:tcp://localhost:1883
For high availability MQTT clusters without a load balancer, specify a list of brokers from the cluster, separated by commas and without spaces. The stage connects to the first available broker, trying in the order listed. For example:tcp://hostA:1883,tcp://hostB:1883,tcp://hostC:1883
Client ID MQTT Client ID. The ID must be unique across all clients connecting to the same broker. You can define an expression that evaluates to the client ID. For example, enter the following expression to use the unique pipeline ID as the client ID:${pipeline:id()}
If a pipeline includes multiple MQTT stages and you want to use the unique pipeline ID as the client ID for both stages, prefix the client ID with a string like this:
Otherwise, all stages will use the same client ID. This can cause problems, such as messages disappearing.sub-${pipeline:id()} and pub-${pipeline:id()}
Topic Topic to publish to. Using simple or bulk edit mode, click the Add icon to read from additional topics. Quality of Service Determines the quality of service level used to guarantee message delivery: - At Most Once (0)
- At Least Once (1)
- Exactly Once (2)
For more information, see the HiveMQ documentation on quality of service levels.
Client Persistence Mechanism Determines the persistence mechanism that the destination uses to guarantee message delivery when the quality of service level is at least once or exactly once. Select one of the following options: - Memory - Store messages in memory on the Data Collector machine until the delivery of the message is complete.
- File - Store messages in a local file on the Data Collector machine until the delivery of the message is complete.
Not used when the quality of service level is at most once.
For more information, see the HiveMQ documentation on client persistence.
Client Persistence Data Directory Local directory on the Data Collector machine where the destination temporarily stores messages in a file when you configure file persistence. The user who starts Data Collector must have read and write access to this directory.
Keep Alive Interval (secs) Maximum time in seconds to allow the connection to the MQTT broker to remain idle. After the destination publishes no messages for this amount of time, the connection is closed. The destination must reconnect to the MQTT broker. Default is 60 seconds.
Use Credentials Enables entering MQTT credentials. Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Username MQTT user name. Password MQTT password. Clean Session Enables connecting to the MQTT broker using a clean session, or a non-persistent connection. See the MQTT documentation for details about MQTT clean sessions. Retain the Message Determines whether or not the MQTT broker retains the message last published by the destination when no MQTT client is subscribed to listen to the topic. When selected, the MQTT broker retains the last message published by the destination. Any messages published earlier are lost. When cleared, all messages published by the destination are lost.
For more information about MQTT retained messages, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages.
Use TLS Enables the use of TLS. Use Remote Truststore Enables loading the contents of the truststore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore. Trusted Certificates Each PEM certificate used in the remote truststore. Enter a credential function that returns the certificate or enter the contents of the certificate. Using simple or bulk edit mode, click the Add icon to add additional certificates.
Truststore File Path to the local truststore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/truststore.jks
By default, no truststore is used.
Truststore Type Type of truststore to use. Use one of the following types:- Java Keystore File (JKS)
- PKCS #12 (p12 file)
Default is Java Keystore File (JKS).
Truststore Password Password to the truststore file. A password is optional, but recommended.
Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.Truststore Trust Algorithm Algorithm to manage the truststore.
Default is SunX509.
Use Default Protocols Uses the default TLSv1.2 transport layer security (TLS) protocol. To use a different protocol, clear this option. Transport Protocols TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols. Note: Older protocols are not as secure as TLSv1.2.Use Default Cipher Suites Uses a default cipher suite for the SSL/TLS handshake. To use a different cipher suite, clear this option. Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites. Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.
-
To configure a test origin, on the Test Origin tab,
configure the origin properties.
All origin properties appear on the Test Origin tab.For configuration details for a specific origin, see “Configuring an <origin type> Origin” in the Origins chapter.To use a different test origin, select the origin to use in the Test Origin property on the General tab.
-
If you are using the pipeline start or stop events, configure the related event
consumer properties on the <event type> - <event
consumer> tab.
All properties for the event consumer appear on the tab.For configuration details for a specific executor, see "Configuring an <executor type> Executor" in the Executors chapter.
To use a different event consumer, select the consumer to use in the Start Event or Stop Event properties on the General tab.
-
Use the Stage Library panel to add an origin stage. In the Properties panel,
configure the stage properties.
Or, to use a pipeline fragment that includes an origin, use the Stage Library panel to add the fragment.For configuration details about origin stages, see Origins.
For more information about pipeline fragments, see Pipeline Fragments.
-
Use the Stage Library panel to add the next stage that you want to use, connect
the origin to the new stage, and configure the new stage.
For configuration details about processors, see Processors.
For configuration details about destinations, see Destinations.
For configuration details about executors, see Executors.
For more information about pipeline fragments, see Pipeline Fragments.
- Add additional stages as necessary.
- At any point, you can use the Preview icon to preview data to help configure the pipeline. For more information about previewing a pipeline, see the Control Hub documentation.
- Optionally, you can create metric or data alerts to track details about a pipeline run and create threshold alerts. For more information, see Rules and Alerts.
- When the pipeline is validated and complete, you can use the Check In icon to publish the pipeline, then use the Create Job icon to create a job.