Couchbase

The Couchbase origin reads JSON documents from Couchbase Server and generates a record for each document in the bucket. Couchbase Server is a distributed NoSQL document-oriented database. The Couchbase origin can process objects in parallel with multiple threads. For information about supported versions, see Supported Systems and Versions.

When you configure the Couchbase origin, you enter connection information, such as the nodes and bucket to connect to, as well as timeout properties for the connection. Optionally, you can enable TLS for the connection. You also enter information to authenticate with Couchbase Server. You can also use a connection to configure the origin.

When a pipeline stops, the Couchbase origin notes where it stops reading. When the pipeline starts again, the origin continues processing from where it stopped by default. You can reset the origin to process all requested data.

The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Prerequisites

Connecting to a Couchbase Server bucket requires that the bucket have a primary index, which makes the bucket queryable.

Before you configure the Couchbase origin to connect to a bucket, you must create a primary index for that bucket. For information on creating a primary index, see the Couchbase documentation.

Offset

When adding new documents to a bucket read by the Couchbase origin, be sure to add them to the end of the configured read order.

The origin uses Couchbase offset capabilities, which uses position in a bucket to determine where to start using data within the bucket. Adding new documents before the offset can result in those documents not being read by the Couchbase origin and other documents being read again.

Event Generation

The Couchbase origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query.

Couchbase events can be used in any logical way. For example:

  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin continues processing from the last-saved offset unless you reset the origin.

    For an example, see Stopping a Pipeline After Processing All Available Data.

  • With the Email executor to send a custom email after receiving an event.

    For an example, see Sending Email During Pipeline Processing.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Record

Event records generated by the Couchbase origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses the following type:
  • no-more-data - Generated when the origin completes processing all data returned by the queries for all buckets.
  • no-more-bucket-data - Generated when the origin completes processing all data returned by the queries for a single bucket.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.

The Couchbase origin can generate the following event records:

no-more-data
The Couchbase origin generates a no-more-data event record when the origin completes processing all data returned by the queries for all buckets.
The origin queries each bucket in turn for new data to process and generates the no-more-data event record only after discovering that there is no additional data to process for any of the buckets. As a result, when processing multiple buckets, generation of the no-more-data event might not occur. For example, if one bucket has additional data, then after processing the data, the origin queries all buckets again for additional data. The origin generates the no-more-data event only after all buckets report no additional data in the same cycle.

The no-more-data event record generated by the origin has the sdc.event.type record header attribute set to no-more-data and does not include any additional fields.

no-more-bucket-data
The Couchbase origin generates a no-more-bucket-data event record when the origin completes processing all data returned by the queries for a single bucket.

The no-more-bucket-data event record generated by the origin has the sdc.event.type record header attribute set to no-more-bucket-data and does not include any additional fields.

Multithreaded Processing

The Couchbase origin uses multiple concurrent threads to process data based on the Number of Threads property.

As the pipeline runs, each thread connects to the origin system, creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin.

Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Configuring a Couchbase Origin

Configure a Couchbase origin to read data from Couchbase Server.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Couchbase tab, configure the following properties:
    Couchbase Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Node List Comma-separated list of one or more nodes in a Couchbase cluster.
    Where Conditions to include in the query.

    Click Add to add one or more conditions that specify the data to include in the query. You can enter [document].[field] to specify a document field or enter meta().[metadata_name] to specify metadata.

    For example, you could enter customer.age > 50 to include data for customers over 50 in a document named customer.

    If you do not add any conditions, the origin reads the entire bucket.

    Order By Document field to base the read order on, expressed as [document].[field].

    For example, you could enter customer.age for the origin to read data in the customer document by age, or customer.date to read the same data by date.

    Bucket Name of an existing Couchbase bucket to connect to.
    Scope Name of an existing Couchbase scope. To use the default scope for the bucket, enter _default.

    If you are using Couchbase SDK 2.x, you must use the default scope.

    For more information on scopes, see the Couchbase documentation.

    Default is _default.

    Collection Name of an existing Couchbase collection. To use the default collection for the bucket, enter _default.

    If you are using Couchbase SDK 2.x, you must use the default collection.

    For more information on collections, see the Couchbase documentation.

    Default is _default.

    Key-Value Timeout (ms) Maximum number of milliseconds allowed to execute each key-value operation.
    Connect Timeout (ms) Maximum number of milliseconds allowed to connect to Couchbase Server.
    Disconnect Timeout (ms) Maximum number of milliseconds allowed to gracefully close a connection.
    Advanced Environment Settings

    Client settings for connections with Couchbase Server. For available settings, see the Couchbase Java SDK documentation.

    Advanced environment settings do not apply to Couchbase SDK 3.x. Instead, you can use Java configuration options in the deployment. For information on using Java configuration options for Data Collector, see Java and Security Configuration.

    Queries per Second Maximum number of queries to run in a second across all buckets. Use 0 for no limit.

    Default is 10.

    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Number of Threads Number of threads the origin generates and uses for multithreaded processing.
  3. On the Credentials tab, configure the following properties:
    Credentials Property Description
    Authentication Mode Authentication to use to connect to Couchbase Server.
    Choose one of the following authentication modes based on the version of Couchbase you are using:
    • Bucket Authentication - Use for Couchbase 4.x.
    • User Authentication - Use for Couchbase 5.x and later.
    Bucket Password Couchbase bucket password.

    Required when Authentication Mode is set to Bucket Authentication.

    User Name Couchbase user name.

    Required when Authentication Mode is set to User Authentication.

    Password Couchbase password.

    Required when Authentication Mode is set to User Authentication.

    Use TLS Enables the use of TLS.
    Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key.
    Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate.
    Keystore File

    Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    By default, no keystore is used.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password

    Password to the keystore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Keystore Key Algorithm

    Algorithm to manage the keystore.

    Default is SunX509.

    Use Remote Truststore Enables loading the contents of the truststore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore.
    Trusted Certificates Each PEM certificate used in the remote truststore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    Truststore File

    Path to the local truststore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/truststore.jks

    By default, no truststore is used.

    Truststore Type
    Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password

    Password to the truststore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Truststore Trust Algorithm

    Algorithm to manage the truststore.

    Default is SunX509.