Elasticsearch

The Elasticsearch origin is a multithreaded origin that reads data from an Elasticsearch cluster, including Elastic Cloud clusters and Amazon OpenSearch Service clusters (formerly Amazon Elasticsearch Service). For information about supported versions, see Supported Systems and Versions.

The origin generates a record for each Elasticsearch document.

When you configure the Elasticsearch origin, you configure the HTTP URLs used to connect to the Elasticsearch cluster and specify whether security is enabled on the cluster. When Data Collector shares the same network as the Elasticsearch cluster, you can enter one or more node URLs and automatically detect additional Elasticsearch nodes on the cluster.

You configure the origin to run in batch or incremental mode. The Elasticsearch origin maintains the last-saved offset only when it runs in incremental mode.

The origin uses the Elasticsearch scroll API to run a query that you define. A query can retrieve large numbers of documents from Elasticsearch. This allows the origin to run a single query and then read multiple batches of data from the scroll until no results are left. You configure a scroll timeout that defines the amount of time that the search context remains valid.

When you configure the Elasticsearch origin, you specify the maximum number of slices to split the scroll into. The number of slices determines how many threads the origin uses to read the data.

Security

When security is enabled for the Elasticsearch cluster, you must specify the authentication method:
Basic
Use Basic authentication for Elasticsearch clusters outside of Amazon OpenSearch Service. With Basic authentication, the stage passes the Elasticsearch user name and password.
AWS Signature V4
Use AWS Signature V4 authentication for Elasticsearch clusters within Amazon OpenSearch Service. The stage must sign HTTP requests with Amazon Web Services credentials. For details, see the Amazon OpenSearch Service documentation. Use one of the following methods to sign with AWS credentials:
Instance profile
When the execution engine - Data Collector or Transformer - runs on an Amazon EC2 instance that has an associated instance profile, the engine uses the instance profile credentials to automatically authenticate with AWS.
To use an instance profile, do not configure the Access Key ID and Secret Access Key properties.
For more information about associating an instance profile with an EC2 instance, see the Amazon EC2 documentation.
AWS access key pair
When the execution engine does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an instance profile, you must specify the Access Key ID and Secret Access Key properties.

Batch and Incremental Mode

The Elasticsearch origin can read data in the following modes:

Batch mode
In batch mode, the origin reads all data returned from the Elasticsearch query, and then the pipeline stops. By default, the origin reads in batch mode.
In batch mode, the origin does not maintain the last-saved offset. When the pipeline stops or when Elasticsearch clears the scroll, the origin does not note where it stops reading. When the origin continues reading after a pipeline restart or after Elasticsearch resets the scroll, the origin processes all requested documents again. This can cause the origin to process duplicate data.
Elasticsearch can clear the scroll for any of the following reasons:
  • The scroll timeout value is exceeded.
  • Due to a network issue, the pipeline loses the connection to Elasticsearch for more than 30 seconds.
  • The pipeline stops when the Delete Scroll on Pipeline Stop property is enabled.
To ensure that the origin does not process duplicate data, configure the origin to run in incremental mode so that the origin does save the offset.
Incremental mode
In incremental mode, the origin performs an initial read of all the data returned from the Elasticsearch query, then the origin periodically runs the query again to check for new data. In incremental mode, the pipeline runs continuously so it can repeat the query at regular intervals.
To use incremental mode, you define the amount of time that the origin waits between queries, and you configure the offset field and initial offset value.
The origin uses an offset field and initial offset value to determine where to start reading data. By default, the offset field is defined as a field named timestamp. You can define any Elasticsearch field as the offset field, though you'll most likely want to use a date field. The initial offset value is a value within the offset field where you want the Elasticsearch origin to start reading.
Include both the offset field and the offset value in the Elasticsearch query.
After the initial read, the origin saves the last read offset value. When the origin runs the next incremental query, the origin uses the last read offset value to continue processing from where it stopped.
In incremental mode, the origin maintains the last-saved offset. When the pipeline stops or when Elasticsearch clears the scroll, the origin notes where it stops reading. When the origin continues reading after a pipeline restart or after Elasticsearch resets the scroll, the origin continues processing from where it stopped. When restarting the pipeline, you can reset the origin to process all requested documents.

Query

Define the query that the origin uses to return data from Elasticsearch. You can define any valid Elasticsearch query.

The default query is valid for batch mode and returns all documents from the Elasticsearch cluster:
{
  "query": {
    "match_all": {}
  }
}

You can optionally specify an Elasticsearch index or mapping type to define the scope of the query in either batch or incremental mode.

For example, let's say that you specify the customer index, do not specify a mapping type, configure the origin to use batch mode, and use the default query. The query properties are configured like so:

When you run the pipeline, the query returns all documents within the customer index.

Incremental Mode Query

If you configure the origin to use incremental mode, you must include both the offset field and the offset value in the Elasticsearch query. Use ${OFFSET} to represent the offset value.

For example, let's say that you do not define an index or mapping type, configure the origin to use incremental mode, and configure the timestamp field as the offset field. You use the Elasticsearch date math expression now-1d/d to set the initial offset value to one day before the current time. You include the offset field and offset value in the query to determine where to start reading data. The query properties are configured like so:

When you run the pipeline, the query returns all documents that have a timestamp field with a value greater than one day before the current time. The query sorts the results by timestamp.

Scroll Timeout

An Elasticsearch query can retrieve large numbers of documents from a single search request. This allows the Elasticsearch origin to run a single query, and then read multiple batches of data from the scroll until no results are left. An Elasticsearch scroll functions like a cursor in a traditional database.

To run an Elasticsearch query using the scroll API, you must set a scroll timeout. The scroll timeout tells Elasticsearch how long it should keep the search context alive. The scroll timeout must be long enough for a single batch to be fully read. When the origin reads another batch from the results, the scroll timeout is reset.

Enter the scroll timeout using Elasticsearch duration time units. For example, enter 1m to set a one minute timeout, or enter 1h to set a one hour timeout.

Optionally, you can configure the origin to delete the scroll when the pipeline stops. When configured, Elasticsearch clears the scroll as soon as the pipeline stops, rather than waiting for the scroll timeout value to be exceeded. Keeping a scroll window open consumes Elasticsearch resources. To free up those resources, configure the origin to clear the scroll when it is no longer being used.

If Elasticsearch clears the scroll because the scroll timeout value is exceeded or because the origin is configured to delete the scroll when the pipeline stops, the origin must run the query again. In batch mode, the origin reads all data returned from the query which can result in duplicate data being processed. In incremental mode, the origin begins reading from the last-saved offset.

Multithreaded Processing

The Elasticsearch origin performs parallel processing and enables the creation of a multithreaded pipeline.

The Elasticsearch origin uses multiple concurrent threads based on the Number of Slices property. Each thread connects to the origin system, creates a batch of data, and passes the batch to an available pipeline runner.

A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.

For example, the origin runs an Elasticsearch query that returns a large number of documents. To decrease processing time, you want to split the scroll into multiple slices and process each slice in parallel. You set the Number of Slices property to 5. When you start the pipeline, the origin splits the scroll into five slices, then the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Configuring an Elasticsearch Origin

Configure an Elasticsearch origin to read data from an Elasticsearch cluster.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Elasticsearch tab, configure the following properties:
    Elasticsearch Property Description
    HTTP URLs Comma-separated list of HTTP or HTTPS URLs used to connect to each Elasticsearch server in the cluster. Use the following format:

    http://<host1>,http://<host2>

    You can specify a port number in the URLs to override the default port defined in the HTTP Port property, as follows:

    http://<host1>:<port>,http://<host2>:<port>

    When a port number is defined in both this property and in the HTTP Port property, the port in this property takes precedence. For example, if you define this property as follows:

    http://server1,http://server2:1234

    And you define the default HTTP Port property as 9200, then server1 uses the default port of 9200 and server2 uses the port 1234.

    HTTP Port Default port number to use for URLs that do not include a port.

    The default HTTP port is 9200. The default HTTPS port is 443.

    Use Security Specifies whether security is enabled on the Elasticsearch cluster.
    Index Optional index to define the scope of the query. Enter an index name or an expression that evaluates to the index name.

    For example, if you enter customer as the index, the query returns documents within the customer index.

    Mapping Optional mapping type to define the scope of the query. Enter the mapping type or an expression that evaluates to the mapping type.

    For example, if you enter user as the mapping type, the query returns documents within the user mapping type.

    This property is ignored when used with Elasticsearch 8.0 or later, which no longer supports mapping types.

    Incremental Mode Defines how the origin queries Elasticsearch. Select to perform incremental queries. Clear to perform a batch query.

    Default is batch mode.

    Query Interval Amount of time that the origin waits between incremental queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Required in incremental mode.

    Default is 1 hour: ${1 * HOURS}.

    Offset Field Field to use for the initial offset value.

    Required in incremental mode.

    Initial Offset Offset value to use when the pipeline starts. Enter a constant, an Elasticsearch date math expression, or a Data Collector expression.

    Required in incremental mode.

    Query Query to read from Elasticsearch. You can define any valid Elasticsearch query.
    Additional HTTP Params Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch.
    Detect Additional Nodes in Cluster Detects additional nodes in the cluster based on the configured HTTP URLs.

    Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true.

    Use only when Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters.

    Scroll Timeout Maximum amount of time to keep the search context alive.

    Enter the timeout using Elasticsearch duration time units. For example, enter 1m to set a one minute timeout.

    Delete Scroll on Pipeline Stop Delete the scroll when the pipeline stops.
    Max Batch Size Maximum number of records to include in a batch. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Number of Slices Number of slices to split the Elasticsearch scroll into.

    This property determines how many threads the origin generates and uses for multithreaded processing.

  3. If you enabled security, on the Security tab, configure the following properties:
    Security Property Description
    Mode Authentication method to use:
    • Basic - Authenticate with Elasticsearch user name and password. Select this option for Elasticsearch clusters outside of Amazon OpenSearch Service.
    • AWS Signature V4 - Authenticate with AWS. Select this option for Elasticsearch clusters within Amazon OpenSearch Service.
    User Name Elasticsearch user name.

    Available when using Basic authentication.

    Password Password for the user account.

    Available when using Basic authentication.

    Region Amazon Web Services region that hosts the Elasticsearch domain.

    Available when using AWS Signature V4 authentication.

    Access Key ID AWS access key ID. Required when not using instance profile credentials.

    Available when using AWS Signature V4 authentication.

    Secret Access Key AWS secret access key. Required when not using instance profile credentials.

    Available when using AWS Signature V4 authentication.

    Enable SSL Enables the use of SSL.
    SSL Truststore Path Location of the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

    SSL Truststore Password Password for the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property.

    Not necessary for Elastic Cloud clusters.