Pipeline Configuration | Parent topic: Pipeline Configuration |
Configure a pipeline to define the stream of data. After you configure the pipeline, you can start the pipeline.
Pipeline Property | Description |
---|---|
Title | Title of the pipeline. Data Collector uses the alphanumeric characters entered for the pipeline title as a prefix for the generated pipeline ID. For example, if you enter “My Pipeline *&%&^^ 123” as the pipeline title, then the pipeline ID has the following value: MyPipeline123tad9f592-5f02-4695-bb10-127b2e41561c. You can edit the pipeline title. However, because the pipeline ID is used to identify the pipeline, any changes to the pipeline title are not reflected in the pipeline ID. |
Description | Optional description of the pipeline. |
Labels | Optional labels to assign to the pipeline. Use labels to group similar pipelines. For example, you might want to group pipelines by database schema or by the test or production environment. You can use nested labels to create a hierarchy of pipeline
groupings. Enter nested labels using the following
format:
<label1>/<label2>/<label3>For example, you might want to group pipelines in the test environment by the origin system. You add the labels Test/HDFS and Test/Elasticsearch to the appropriate pipelines. |
Execution Mode ![]() |
Execution mode of the pipeline:
|
Delivery Guarantee
![]() |
Determines how Data Collector handles data after an unexpected event causes the
pipeline to stop running:
Default is At Least Once. |
Start Event | Determines how the start event is handled. Select one of
the following options:
Use in standalone pipelines only. For more information about pipeline events, see Pipeline Event Generation. |
Stop Event | Determines how the stop event is handled. Select one of
the following options:
Use in standalone pipelines only. For more information about pipeline events, see Pipeline Event Generation. |
Retry Pipeline on Error ![]() |
Retries the pipeline upon error. |
Retry Attempts | Number of retries attempted. Use -1 to retry
indefinitely. The wait time between retries starts at 15 seconds and doubles until reaching five minutes. |
Max Pipeline Memory ![]() |
Maximum amount of memory for the pipeline to use. Used
only when the Data Collector monitor.memory configuration property is set to
true. You can enter a numeric value or edit the default expression to use a percentage of the Data Collector Java heap size. Default is 65% of the Data Collector Java heap
size:
${jvm:maxMemoryMB() * 0.65} |
On Memory Exceeded | Action to take when the pipeline memory reaches the
configured Max Pipeline Memory:
|
Rate Limit (records / sec) ![]() |
Maximum number of records that the pipeline can read in a
second. Use 0 or no value to set no rate limit. Default is 0. |
Max Runners | The maximum number of pipeline runners to use in a
multithreaded pipeline. Use 0 for no limit. When set to 0, Data Collector generates up to the maximum number of threads or concurrency configured in the origin. You can use this property to help tune pipeline performance. For more information, see Tuning Threads and Runners. Default is 0. |
Create Failure Snapshot | Automatically creates a snapshot if the pipeline fails because of data-related errors. Can be used to troubleshoot the pipeline. |
Notifications Property | Description |
---|---|
Notify on Pipeline State Changes
![]() |
Sends notifications when the pipeline encounters the listed pipeline states. |
Email IDs | Email addresses to receive notification when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional addresses. |
Webhooks
![]() |
Webhook to send when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional webhooks. |
Webhook URL | URL to send the HTTP request. |
Headers | Optional HTTP request headers. |
HTTP Method | HTTP method. Use one of the following methods:
|
Payload
![]() |
Optional payload to use. Available for PUT, POST, and
DELETE methods. Use any valid content type. You can use webhook parameters in the payload to include information about the triggering event, such as the pipeline name or state. Enclose webhook parameters in double curly brackets as follows: {{PIPELINE_STATE}}. |
Content Type | Optional content type of the payload. Configure this property when the content type is not declared in the request headers. |
Authentication Type | Optional authentication type to include in the request. Use None, Basic, Digest, or
Universal. Use Basic for Form authentication. |
User Name | User name to include when using authentication. |
Password | Password to include when using authentication. |
Error Records Property | Description |
---|---|
Error Records
![]() |
Determines how to handle records that cannot be processed
as expected. Use one of the following options:
|
Error Record Policy | Determines the version of the record to use as a basis for an error record. For more information, see Error Records and Version. |
Write to Pipeline Property | Description |
---|---|
SDC RPC Connection
![]() |
Connection information for the destination pipeline to
continue processing data. Use the following format:
<host>:<port>. Use a single RPC connection for each destination pipeline. Using simple or bulk edit mode, add additional connections as needed. Use the port number when you configure the SDC RPC origin that receives the data. |
SDC RPC ID | User-defined ID to allow the destination to pass data to an SDC RPC origin. Use this ID in all SDC RPC origins to process data from the destination. |
Retries Per Batch | Number of times the destination tries to write a batch to
the SDC RPC origin. When the destination cannot write the batch within the configured number of retries, it fails the batch. Default is 3. |
Back Off Period | Milliseconds to wait before retrying writing a batch to
the SDC RPC origin. The value that you enter increases exponentially after each retry, until it reaches the maximum wait time of 5 minutes. For example, if you set the back off period to 10, the destination attempts the first retry after waiting 10 milliseconds, attempts the second retry after waiting 100 milliseconds, and attempts the third retry after waiting 1,000 milliseconds. Set to 0 to retry immediately. Default is 0. |
Connection Timeout (ms) | Milliseconds to establish a connection to the SDC RPC
origin. The destination retries the connection based on the Retries Per Batch property. Default is 5000 milliseconds. |
Read Timeout (ms) | Milliseconds to wait for the SDC RPC origin to read data
from a batch. The destination retries the write based on the Retries Per Batch property. Default is 2000 milliseconds. |
Use Compression
![]() |
Enables the destination to use compression to pass data to the SDC RPC origin. Enabled by default. |
Use TLS |
Enables the use of TLS. |
Truststore File
![]() |
The path to the truststore file. Enter an absolute path
to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.
For more information about environment variables, see Data Collector Environment Configuration. By default, no truststore is used. |
Truststore Type | Type of truststore to use. Use one of the following types:
Default is Java Keystore File (JKS). |
Truststore Password | Password to the truststore file. A password is optional,
but recommended. Tip: To secure sensitive information such as
passwords, you can use runtime resources or credential stores.
|
Truststore Trust Algorithm | The algorithm used to manage the truststore. Default is SunX509. |
Use Default Protocols | Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option. |
Transport Protocols
![]() |
The TLS protocols to use. To use a protocol other than
the default TLSv1.2, click the Add icon and enter the
protocol name. You can use simple or bulk edit mode to add
protocols. Note: Older protocols are not as secure as TLSv1.2.
|
Use Default Cipher Suites | Determines the cipher suite to use when performing the
SSL/TLS handshake. Data Collector provides a set of cipher suites that it can use by default. For a full list, see Cipher Suites. |
Cipher Suites | Cipher suites to use. To use a cipher suite that is not a
part of the default set, click the
Add icon and enter the name of
the cipher suite. You can use simple or bulk edit mode to add cipher
suites. Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use. |
Elasticsearch Property | Description |
---|---|
Cluster HTTP URI | HTTP URI used to connect to the cluster. Use the
following format:
<host>:<port> |
Additional HTTP Params | Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch. |
Detect Additional Nodes in Cluster |
Detects additional nodes in the cluster based on the configured Cluster URI. Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true. Use only when the Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters. |
Use Security | Specifies whether security is enabled on the Elasticsearch cluster. |
Time Basis
![]() |
Time basis to use for writing to time-based indexes. Use
one of the following expressions:
When the Index property does not include datetime variables, you can ignore this property. Default is ${time:now()}. |
Data Time Zone | Time zone for the destination system. Used to resolve datetimes in time-based indexes. |
Index | Index for the generated documents. Enter an index name or
an expression that evaluates to the index name. For example, if you enter customer as the index, the destination writes the document within the customer index. If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables. |
Mapping | Mapping type for the generated documents. Enter the
mapping type, an expression that evaluates to the mapping
type, or a field that includes the mapping type. For example, if you enter user as the mapping type, the destination writes the document with a user mapping type. |
Document ID
![]() |
Expression that evaluates to the ID for the generated
documents. When you do not specify an ID, Elasticsearch
creates an ID for each document. By default, the destination allows Elasticsearch to create the ID. |
Parent ID | Optional parent ID for the generated documents. Enter a
parent ID or an expression that evaluates to the parent ID.
Use to establish a parent-child relationship between documents in the same index. |
Routing | Optional custom routing value for the generated
documents. Enter a routing value or an expression that
evaluates to the routing value. Elasticsearch routes a document to a particular shard in an index based on the routing value defined for the document. You can define a custom value for each document. If you don’t define a custom routing value, Elasticsearch uses the parent ID (if defined) or the document ID as the routing value. |
Data Charset |
Character encoding of the data to be processed. |
Security Property | Description |
---|---|
Security Username/Password | Elasticsearch username and password. Enter the
username and password using the following
syntax:
<username>:<password> Tip: To
secure sensitive information such as usernames and passwords, you can use
runtime resources or credential stores.
|
SSL Truststore Path |
Location of the truststore file. Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property. Not necessary for Elastic Cloud clusters. |
SSL Truststore Password |
Password for the truststore file. Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property. Not necessary for Elastic Cloud clusters. |
Write to File Property | Description |
---|---|
Directory | Local directory for error record files. |
File Prefix | Prefix used for error record files. Use to differentiate
error record files from other files in the directory. Uses the prefix sdc-${sdc:id()} by default. The prefix evaluates to sdc-<Data Collector ID>. This provides default differentiation in case several Data Collectors write to the same directory. The Data Collector ID is stored in the following file: $SDC_DATA/sdc.id file. For more information about environment variables, see Data Collector Environment Configuration. |
File Wait Time (secs) | Number of seconds Data Collector waits for error records. After that time, it creates a
new error record file. You can enter a number of seconds or use the default expression to enter the time in minutes. |
Max File Size (MB) | Maximum size for error files. Exceeding this size creates
a new error file. Use 0 to avoid using this property. |
Write to Kafka Property | Description |
---|---|
Broker URI | Connection string for the Kafka broker. Use the
following format:
<host>:<port>. To ensure a connection, enter a comma-separated list of additional broker URI. |
Runtime Topic Resolution ![]() |
Evaluates an expression at runtime to determine the topic to use for each record. |
Topic Expression | Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name. |
Topic White List | List of valid topic names to write to when using
runtime topic resolution. Use to avoid writing to
invalid topics. Records that resolve to invalid topic
names are passed to the stage for error handling. Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid. |
Topic | Topic to use. Not available when using runtime topic resolution. |
Partition Strategy
![]() |
Strategy to use to write to partitions:
|
Partition Expression
![]() |
Expression to use with the
default or expression partition strategy. When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value. When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value. Optionally, click Ctrl + Space Bar for help with creating the expression. |
One Message per Batch | For each batch, writes the records to each partition as a single message. |
Kafka Configuration | Additional Kafka properties to use. Using simple or bulk edit mode, click the
Add icon and define the Kafka
property name and value. Use the property names and values as expected by Kafka. Do not use the broker.list property. For information about enabling secure connections to Kafka, see Enabling Security. |
MapR Streams Producer Property | Description |
---|---|
Runtime Topic Resolution ![]() |
Evaluates an expression at runtime to determine the topic to use for each record. |
Topic | Topic to use. Not available when using runtime topic resolution. |
Topic Expression | Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name. |
Topic White List | List of valid topic names to write to when using
runtime topic resolution. Use to avoid writing to
invalid topics. Records that resolve to invalid topic
names are passed to the stage for error handling. Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid. |
Partition Strategy
![]() |
Strategy to use to write to partitions:
|
Partition Expression
![]() |
Expression to use with the
default or expression partition strategy. When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value. When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value. Optionally, click Ctrl + Space Bar for help with creating the expression. |
One Message per Batch | For each batch, writes the records to each partition as a single message. |
MapR Streams Configuration
![]() |
Additional configuration properties to use. Using simple or bulk edit mode, click the
Add icon and define the MapR
Streams property name and value. Use the property names and values as expected by MapR. You can use MapR Streams properties and the set of Kafka properties supported by MapR Streams. |
MQTT Property | Description |
---|---|
Broker URL | MQTT Broker URL. Enter in the following
format:<tcp | ssl>://<hostname>:<port> Use ssl for secure connections to the broker. For
example:
tcp://localhost:1883 |
Client ID | MQTT Client ID. The ID must be unique across all clients connecting to the same
broker. You can define an expression that evaluates to the client ID. For example, you can
enter the following expression to use the unique pipeline ID as the client
ID:
${pipeline:id()} |
Topic
![]() |
Topic to publish to. Using simple or bulk edit mode, click the Add icon to read from additional topics. |
Quality of Service | Determines the quality of service level used to guarantee message delivery:
For more information, see the HiveMQ documentation on quality of service levels. |
Client Persistence Mechanism | Determines the persistence mechanism that the destination uses to guarantee message
delivery when the quality of service level is at least once or exactly once. Select one of
the following options:
Not used when the quality of service level is at most once. For more information, see the HiveMQ documentation on client persistence. |
Client Persistence Data Directory | Local directory on the Data Collector
machine where the destination temporarily stores messages in a file when you configure file
persistence. The user who starts Data Collector must have read and write access to this directory. |
Keep Alive Interval (secs) | Maximum time in seconds to allow the connection to the MQTT broker to remain idle.
After the destination publishes no messages for this amount of time, the connection is
closed. The destination must reconnect to the MQTT broker. Default is 60 seconds. |
Use Credentials | Enables entering MQTT credentials. Tip: To
secure sensitive information such as usernames and passwords, you can use
runtime resources or credential stores.
|
Username | MQTT user name. |
Password | MQTT password. |
Retain the Message | Determines whether or not the MQTT broker retains the message last published by the
destination when no MQTT client is subscribed to listen to the topic. When selected, the MQTT broker retains the last message published by the destination. Any messages published earlier are lost. When cleared, all messages published by the destination are lost. For more information about MQTT retained messages, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages. |
Use TLS |
Enables the use of TLS. |
Truststore File
![]() |
The path to the truststore file. Enter an absolute path
to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.
For more information about environment variables, see Data Collector Environment Configuration. By default, no truststore is used. |
Truststore Type | Type of truststore to use. Use one of the following types:
Default is Java Keystore File (JKS). |
Truststore Password | Password to the truststore file. A password is optional,
but recommended. Tip: To secure sensitive information such as
passwords, you can use runtime resources or credential stores.
|
Truststore Trust Algorithm | The algorithm used to manage the truststore. Default is SunX509. |
Use Default Protocols | Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option. |
Transport Protocols
![]() |
The TLS protocols to use. To use a protocol other than
the default TLSv1.2, click the Add icon and enter the
protocol name. You can use simple or bulk edit mode to add
protocols. Note: Older protocols are not as secure as TLSv1.2.
|
Use Default Cipher Suites | Determines the cipher suite to use when performing the
SSL/TLS handshake. Data Collector provides a set of cipher suites that it can use by default. For a full list, see Cipher Suites. |
Cipher Suites | Cipher suites to use. To use a cipher suite that is not a
part of the default set, click the
Add icon and enter the name of
the cipher suite. You can use simple or bulk edit mode to add cipher
suites. Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use. |
Yarn Cluster Property | Description |
---|---|
Worker Count | Number of workers used in a Cluster Yarn Streaming
pipeline. Use to limit the number of workers spawned for
processing. By default, one worker is spawned for every
partition in the topic. Default is 0 for one worker for each partition. |
Worker Java Options | Additional Java properties for the pipeline. Separate
properties with a space. The following properties are set by default.
Changing the default properties is not recommended. You can add any valid Java property. |
Launcher Env Configuration |
Additional configuration properties for the cluster launcher. Using simple or bulk edit mode, click the Add icon and define the property name and value. |
Worker Memory (MB) | Maximum amount of memory allocated to each Data Collector worker in the cluster. Default is 1024 MB. |
Extra Spark Configuration | For Cluster Yarn Streaming pipelines, you can configure
additional Spark configurations to pass to the spark-submit
script. Enter the Spark configuration name and the value to
use. The specified configurations are passed to the
spark-submit script as
follows:
spark-submit --conf <key>=<value> For example, to limit the off-heap memory allocated to each executor, you can use the spark.yarn.executor.memoryOverhead configuration and set it to the number of MB that you want to use. Data Collector does not validate the property names or values. For details on additional Spark configurations that you can use, see the Spark documentation for the Spark version that you are using. |
For information on configuring a pipeline to aggregate statistics for Control Hub, see Pipeline Statistics.
For details on the Email executor, see Configuring an Email Executor.
For details on the HDFS File Metadata executor, see Configuring an HDFS File Metadata Executor.
For details on the Hive Query executor, see Configuring a Hive Query Executor.
For details on the JDBC Query executor, see Configuring a JDBC Query Executor.
For details on the MapReduce executor, see Configuring a MapReduce Executor.
For details on the Shell executor, see Configuring a Shell Executor.
For details on the Spark executor, see Configuring a Spark Executor.
For details on writing to another pipeline, see Configuring an SDC RPC Destination.
For configuration details about destinations, see Destinations.
For configuration details about executors, see Executors.