Couchbase
The Couchbase origin reads JSON documents from Couchbase Server and generates a record for each document in the bucket. Couchbase Server is a distributed NoSQL document-oriented database. The Couchbase origin can process objects in parallel with multiple threads. For information about supported versions, see Supported Systems and Versions.
When you configure the Couchbase origin, you enter connection information, such as the nodes and bucket to connect to, as well as timeout properties for the connection. Optionally, you can enable TLS for the connection. You also enter information to authenticate with Couchbase Server.
When a pipeline stops, the Couchbase origin notes where it stops reading. When the pipeline starts again, the origin continues processing from where it stopped by default. You can reset the origin to process all requested data.
The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Prerequisites
Connecting to a Couchbase Server bucket requires that the bucket have a primary index, which makes the bucket queryable.
Before you configure the Couchbase origin to connect to a bucket, you must create a primary index for that bucket. For information on creating a primary index, see the Couchbase documentation.
Offset
When adding new documents to a bucket read by the Couchbase origin, be sure to add them to the end of the configured read order.
The origin uses Couchbase offset capabilities, which uses position in a bucket to determine where to start using data within the bucket. Adding new documents before the offset can result in those documents not being read by the Couchbase origin and other documents being read again.
Event Generation
The Couchbase origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query.
Couchbase events can be used in any logical way. For example:
- With the Pipeline Finisher executor to
stop the pipeline and transition the pipeline to a Finished state when
the origin completes processing available data.
When you restart a pipeline stopped by the Pipeline Finisher executor, the origin continues processing from the last-saved offset unless you reset the origin.
For an example, see Stopping a Pipeline After Processing All Available Data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending Email During Pipeline Processing.
-
With a destination to store information about completed queries.
For an example, see Preserving an Audit Trail of Events.
For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Event Record
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses the following type:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
The Couchbase origin can generate the following event records:
- no-more-data
- The Couchbase origin generates a no-more-data event record when the origin completes processing all data returned by the queries for all buckets.
- no-more-bucket-data
- The Couchbase origin generates a no-more-bucket-data event record when the
origin completes processing all data returned by the queries for a single
bucket.
The no-more-bucket-data event record generated by the origin has the
sdc.event.type
record header attribute set tono-more-bucket-data
and does not include any additional fields.
Multithreaded Processing
The Couchbase origin uses multiple concurrent threads to process data based on the Number of Threads property.
As the pipeline runs, each thread connects to the origin system, creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin.
Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Configuring a Couchbase Origin
Configure a Couchbase origin to read data from Couchbase Server.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Stage Library Library version that you want to use. Produce Events Generates event records when events occur. Use for event handling. On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the Couchbase tab, configure the following
properties:
Couchbase Property Description Node List Comma-separated list of one or more nodes in a Couchbase cluster. Where Conditions to include in the query. Click Add to add one or more conditions that specify the data to include in the query. You can enter
[document].[field]
to specify a document field or entermeta().[metadata_name]
to specify metadata.For example, you could enter
customer.age > 50
to include data for customers over 50 in a document namedcustomer
.If you do not add any conditions, the origin reads the entire bucket.
Order By Document field to base the read order on, expressed as [document].[field]
.For example, you could enter
customer.age
for the origin to read data in thecustomer
document by age, orcustomer.date
to read the same data by date.Bucket Name of an existing Couchbase bucket to connect to. Scope Name of an existing Couchbase scope. To use the default scope for the bucket, enter _default
.If you are using Couchbase SDK 2.x, you must use the default scope.
For more information on scopes, see the Couchbase documentation.
Default is _default.
Collection Name of an existing Couchbase collection. To use the default collection for the bucket, enter _default
.If you are using Couchbase SDK 2.x, you must use the default collection.
For more information on collections, see the Couchbase documentation.
Default is _default.
Key-Value Timeout (ms) Maximum number of milliseconds allowed to execute each key-value operation. Connect Timeout (ms) Maximum number of milliseconds allowed to connect to Couchbase Server. Disconnect Timeout (ms) Maximum number of milliseconds allowed to gracefully close a connection. Advanced Environment Settings Client settings for connections with Couchbase Server. For available settings, see the Couchbase Java SDK documentation.
Advanced environment settings do not apply to Couchbase SDK 3.x. Instead, you can use the SDC_JAVA_OPTS environment variable. For information on using environment variables for Data Collector, see Data Collector Environment Configuration.
Queries per Second Maximum number of queries to run in a second across all buckets. Use 0 for no limit. Default is 10.
Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size. Default is 1000. The Data Collector default is 1000.
Number of Threads Number of threads the origin generates and uses for multithreaded processing. -
On the Credentials tab, configure the following
properties:
Credentials Property Description Authentication Mode Authentication to use to connect to Couchbase Server. Choose one of the following authentication modes based on the version of Couchbase you are using:- Bucket Authentication - Use for Couchbase 4.x.
- User Authentication - Use for Couchbase 5.x and later.
Bucket Password Couchbase bucket password. Required when Authentication Mode is set to Bucket Authentication.
User Name Couchbase user name. Required when Authentication Mode is set to User Authentication.
Password Couchbase password. Required when Authentication Mode is set to User Authentication.
Use TLS Enables the use of TLS. Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore. Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key. Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate. Using simple or bulk edit mode, click the Add icon to add additional certificates.
Keystore File Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/keystore.jks
By default, no keystore is used.
Keystore Type Type of keystore to use. Use one of the following types: - Java Keystore File (JKS)
- PKCS #12 (p12 file)
Default is Java Keystore File (JKS).
Keystore Password Password to the keystore file. A password is optional, but recommended.
Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.Keystore Key Algorithm Algorithm to manage the keystore.
Default is SunX509.
Use Remote Truststore Enables loading the contents of the truststore from a remote credential store or from values entered in the stage properties. For more information, see Remote Keystore and Truststore. Trusted Certificates Each PEM certificate used in the remote truststore. Enter a credential function that returns the certificate or enter the contents of the certificate. Using simple or bulk edit mode, click the Add icon to add additional certificates.
Truststore File Path to the local truststore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/truststore.jks
By default, no truststore is used.
Truststore Type Type of truststore to use. Use one of the following types:- Java Keystore File (JKS)
- PKCS #12 (p12 file)
Default is Java Keystore File (JKS).
Truststore Password Password to the truststore file. A password is optional, but recommended.
Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.Truststore Trust Algorithm Algorithm to manage the truststore.
Default is SunX509.