Elasticsearch

Supported pipeline types:
  • Data Collector

The Elasticsearch destination writes data to an Elasticsearch cluster, including Elastic Cloud clusters and Amazon OpenSearch Service clusters (formerly Amazon Elasticsearch Service). For information about supported versions, see Supported Systems and Versions.

The destination uses the Elasticsearch HTTP module to access the Bulk API and write each record to Elasticsearch as a document.

When you configure the Elasticsearch destination, you configure the HTTP URLs used to connect to the Elasticsearch cluster and specify whether security is enabled on the cluster. When Data Collector shares the same network as the Elasticsearch cluster, you can enter one or more node URLs and automatically detect additional Elasticsearch nodes on the cluster.

The Elasticsearch destination can use CRUD operations defined in the sdc.operation.type record header attribute to write data. You can define a default operation for records without the header attribute or value. You can also configure how to handle records with unsupported operations. For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.

You can add advanced Elasticsearch properties as needed.

Security

When security is enabled for the Elasticsearch cluster, you must specify the authentication method:
Basic
Use Basic authentication for Elasticsearch clusters outside of Amazon OpenSearch Service. With Basic authentication, the stage passes the Elasticsearch user name and password.
AWS Signature V4
Use AWS Signature V4 authentication for Elasticsearch clusters within Amazon OpenSearch Service. The stage must sign HTTP requests with Amazon Web Services credentials. For details, see the Amazon OpenSearch Service documentation. Use one of the following methods to sign with AWS credentials:
Instance profile
When the execution engine - Data Collector or Transformer - runs on an Amazon EC2 instance that has an associated instance profile, the engine uses the instance profile credentials to automatically authenticate with AWS.
To use an instance profile, do not configure the Access Key ID and Secret Access Key properties.
For more information about associating an instance profile with an EC2 instance, see the Amazon EC2 documentation.
AWS access key pair
When the execution engine does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an instance profile, you must specify the Access Key ID and Secret Access Key properties.

Time Basis and Time-Based Indexes

The time basis is the time used by the Elasticsearch destination to write records to time-based indexes. When indexes have no time component, you can ignore the time basis property.

You can use the time of processing or the time associated with the data as the time basis.

For example, say you define the Index property using the following datetime variables:
logs-${YYYY()}-${MM()}-${DD()}

If you use the time of processing as the time basis, the destination write records to indexes based on when it processes each record. If you use the time associated with the data, such as a transaction timestamp, then the destination writes records to the indexes based on that timestamp.

You can use the following times as the time basis:
Processing Time
When you use processing time as the time basis, the destination writes to indexes based on the processing time and the index. To use the processing time as the time basis, use the following expression:
${time:now()}
This is the default time basis.
Record Time
When you use the time associated with a record as the time basis, you specify a date field in the record. The destination writes data to indexes based on the datetimes associated with the records.
To use a time associated with the record, use an expression that calls a field and resolves to a datetime value, such as ${record:value("/Timestamp")}.

Document IDs

When appropriate, you can specify an expression that defines the document ID. When you do not specify an expression, Elasticsearch generates IDs for each document.

When you configure the destination to perform create, update, or delete operations, you must define the document ID.

For example, to perform updates for documents with IDs based on the EmployeeID field, define the write operation as update and define the Document ID as follows: ${record:value('/EmployeeID')}.

You can also optionally define a parent ID for each document to define a parent/child relationship between documents in the same index.

CRUD Operation Processing

The Elasticsearch destination can create, update, delete, or index data. The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.

The destination uses the header attribute and stage properties as follows:

CRUD operation header attribute
The destination looks for the CRUD operation in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for CREATE, the equivalent of INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 4 for INDEX, the equivalent of UPSERT
  • 8 for UPDATE with doc_as_upsert, the equivalent of MERGE
If your pipeline has a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline has a non-CDC origin, you can use the Expression Evaluator processor or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
If there is no CRUD operation in the sdc.operation.type record header attribute, the destination uses the operation configured in the Default Operation property.
If the sdc.operation.type record header attribute contains an unsupported value, the destination takes the action configured in the Unsupported Operation Handling property. The destination can discard the record, send the record for error handling, or write the record using the default operation.

Configuring an Elasticsearch Destination

Configure an Elasticsearch destination to write data to an Elasticsearch cluster.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the Elasticsearch tab, configure the following properties:
    Elasticsearch Property Description
    HTTP URLs Comma-separated list of HTTP or HTTPS URLs used to connect to each Elasticsearch server in the cluster. Use the following format:

    http://<host1>,http://<host2>

    You can specify a port number in the URLs to override the default port defined in the HTTP Port property, as follows:

    http://<host1>:<port>,http://<host2>:<port>

    When a port number is defined in both this property and in the HTTP Port property, the port in this property takes precedence. For example, if you define this property as follows:

    http://server1,http://server2:1234

    And you define the default HTTP Port property as 9200, then server1 uses the default port of 9200 and server2 uses the port 1234.

    HTTP Port Default port number to use for URLs that do not include a port.

    The default HTTP port is 9200. The default HTTPS port is 443.

    Use Security Specifies whether security is enabled on the Elasticsearch cluster.
    Index Index for the generated documents. Enter an index name or an expression that evaluates to the index name.

    For example, if you enter customer as the index, the destination writes the document within the customer index.

    If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables.

    Mapping Mapping type for the generated documents. Valid values depend on the following Elasticsearch versions:
    • Elasticsearch 5.x - Enter the mapping type, an expression that evaluates to the mapping type, or a field that includes the mapping type. For example, if you enter user as the mapping type, the destination writes the document with a user mapping type.
    • Elasticsearch 6.x or 7.x - Enter {_doc} if Elasticsearch was upgraded from version 5.x. Otherwise, enter empty curly brackets as follows: {}.
    • Elasticsearch 8.x - This property is ignored when used with Elasticsearch 8.0 or later, which no longer supports mapping types.

    For more information about the removal of mapping types, see the Elasticsearch documentation.

    Document ID Expression that evaluates to the ID for the generated documents. When you do not specify an ID, Elasticsearch creates an ID for each document.

    By default, the destination allows Elasticsearch to create the ID.

    Additional HTTP Params Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch.
    Detect Additional Nodes in Cluster Detects additional nodes in the cluster based on the configured HTTP URLs.

    Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true.

    Use only when Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters.

    Time Basis Time basis to use for writing to time-based indexes. Use one of the following expressions:
    • ${time:now()} - Uses the processing time as the time basis. The processing time is the time associated with the Data Collector running the pipeline.
    • An expression that calls a field and resolves to a datetime value, such as ${record:value(<date field path>)}. Uses the datetime result as the time basis.

    When the Index property does not include datetime variables, you can ignore this property.

    Default is ${time:now()}.

    Data Time Zone Time zone for the destination system. Used to resolve datetimes in time-based indexes.
    Parent ID Optional parent ID for the generated documents. Enter a parent ID or an expression that evaluates to the parent ID.

    Use to establish a parent-child relationship between documents in the same index.

    Routing Optional custom routing value for the generated documents. Enter a routing value or an expression that evaluates to the routing value.

    Elasticsearch routes a document to a particular shard in an index based on the routing value defined for the document. You can define a custom value for each document. If you don’t define a custom routing value, Elasticsearch uses the parent ID (if defined) or the document ID as the routing value.

    Data Charset

    Character encoding of the data to be processed.

    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.
    Additional Properties Extra fields to include in the action statement. Specify in JSON format.

    For example, you can use the _retry_on_conflict field to specify how many times an update is retried when there is a version conflict. To specify three retries, include the following:

    "_retry_on_conflict" : 3

    You can use record functions and delimited data record functions when configuring this property.

    For more information, see the Elasticsearch documentation.

  3. If you enabled security, on the Security tab, configure the following properties:
    Security Property Description
    Mode Authentication method to use:
    • Basic - Authenticate with Elasticsearch user name and password. Select this option for Elasticsearch clusters outside of Amazon OpenSearch Service.
    • AWS Signature V4 - Authenticate with AWS. Select this option for Elasticsearch clusters within Amazon OpenSearch Service.
    User Name Elasticsearch user name.

    Available when using Basic authentication.

    Password Password for the user account.

    Available when using Basic authentication.

    Region Amazon Web Services region that hosts the Elasticsearch domain.

    Available when using AWS Signature V4 authentication.

    Access Key ID AWS access key ID. Required when not using instance profile credentials.

    Available when using AWS Signature V4 authentication.

    Secret Access Key AWS secret access key. Required when not using instance profile credentials.

    Available when using AWS Signature V4 authentication.

    Enable SSL Enables the use of SSL.
    SSL Truststore Path Location of the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

    SSL Truststore Password Password for the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property.

    Not necessary for Elastic Cloud clusters.