MongoDB Atlas CDC
Supported pipeline types:
|
The MongoDB Atlas CDC origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.
When you configure the origin, you define connection information, such as the connection string and credentials to use. You can specify SSL/TLS properties for an SSL/TLS-enabled MongoDB cluster.
You can configure where the origin reads changes from, initial offset, and read preference. You can define a custom filter and configure the origin to flatten nested structures.
You can optionally configure advanced options that determine how the origin connects to MongoDB, such as the maximum number of open connections to allow in the connection pool and the cursor type to use for capped connections.
When the pipeline stops, the origin notes where it stops reading. When the pipeline starts again, the origin continues processing from the last-saved offset by default. You can reset the origin to process all available data.
Credentials
Based on the authentication used by MongoDB, configure the MongoDB Atlas CDC origin to use no authentication, username/password authentication, or LDAP authentication. By default, no authentication is used.
- Authentication method
- Specify the authentication to use with the Authentication Method property on
the Credentials tab:
- None
- Username / Password
- LDAP
- Connection string
- If you prefer, you can specify credentials in the connection string on the Connection tab. However, specifying credentials on the Credentials tab is the recommended method.
Read Preferences
You can configure the read preference that the MongoDB Atlas CDC origin uses. The read preference determines how the origin reads data from different members of the MongoDB replica set.
- Primary - Requires reading from the primary member.
- Primary Preferred - Prefers reading from the primary, but allows reads from a secondary member.
- Secondary - Requires reading from a secondary member.
- Secondary Preferred - Prefers reading from a secondary, but allows reads from a primary when necessary.
- Nearest - Reads from the member with the least network latency.
By default, the origin uses Secondary Preferred to avoid making unnecessary requests to the primary member.
Generated Records
The MongoDB Atlas CDC origin generates records based on data from a MongoDB change stream or the MongoDB oplog and adds CRUD and CDC related record header attributes.
The structure of oplog records is unique, so when necessary, you might use processors in the pipeline to convert record structure.
For example, for insert records, record data resides in a map field named
o
. But for an update record, the _id
field is part
of the o2
map field. To merge the record data, you can use a Field
Flattener to flatten the map fields and a Field Remover to remove any unnecessary
fields.
For more information about the oplog record structure, see the MongoDB documentation. The following site is also a good resource: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/.
CRUD Operation and CDC Header Attributes
The MongoDB Atlas CDC origin includes the CRUD operation type in the sdc.operation.type record header attribute.
If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Elasticsearch, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator processor or any scripting processor to manipulate the value in the header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.
- 1 for INSERT
- 2 for DELETE
- 3 for UPDATE
- 5 for unsupported operations, such as CMD, NOOP, or DB, which are available MongoDB operation types but not applicable to record data.
- op - The CRUD operation using the following values:
- i for INSERT
- u for UPDATE
- d for DELETE
- ns - The namespace, using the following format:
<database>:<collection>
.
Enabling SSL/TLS
- Atlas/System CA - Connects to a MongoDB Atlas cluster. You can also use this when your certificates or keys have already been specified at the JVM level.
- Server Validation (1 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and does not need to prove client identity.
- Server and Client Validation (2 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and the server also validates the client key. This occurs when the cluster is set up to require client certificates.
- JKS (Java Keystore)
- PEM (text-based)
- DER (text-based)
- PKCS #7 / P7B
- PKCS #12 / P12 / PFX
- Private keys inside PEM, DER, or PKCS #12 encoded as PKCS#1 or PKCS#8
If the files are in PEM or DER plain text format, you can
provide the text in the stage properties. The certificate should begin and end with text
such as: —BEGIN CERTIFICATE—
or —END PRIVATE KEY—
.
Otherwise, you provide a path to the certificate file.
MongoDB Data Types
When the MongoDB Atlas CDC origin reads from MongoDB, it converts standard MongoDB data types to the following Data Collector data types.
The origin can also convert supported BSON types to Data Collector data types. For more information, see Reading BSON Types.
Standard MongoDB Type | Data Collector Type |
---|---|
Array | List |
Binary | Byte Array |
Boolean | Boolean |
Date | Date |
Double | Double |
Int32 | Integer |
Int64 | Long |
JavaScript | String |
Object | List-Map |
String | String |
Timestamp | Datetime |
Reading BSON Types
When reading from MongoDB, the MongoDB Atlas origin converts standard MongoDB data types to Data Collector data types as described in MongoDB Data Types.
The origin converts supported BSON data types to Data Collector
data types as well. When converting BSON data types, the origin adds a field attribute
named bsonType
to the converted field.
Some supported
BSON data types encode additional information with the data. Where this occurs, the
information is included as additional attributes for the field. For example, a
BsonTimestamp can encode an ordinal value along with the date and time. When the origin
reads the data, it converts the field to a Datetime field with an
ordinal
field attribute set to the ordinal value encoded with the
data.
BSON Data Type | Data Collector Type | Field Attributes and Values |
---|---|---|
Binary | Byte Array | bsonType : Binary |
BsonDbPointer | Map field with the following subfields:
|
bsonType : Bson_Db_Pointer |
BsonRegularExpression | String |
|
BsonTimestamp | Datetime |
|
Code | String | bsonType : Code |
CodeWithScope | String | bsonType : Code_With_Scope |
DBRef | Map field with the following subfields:
|
bsonType : Db_Ref |
Decimal128 | Decimal | bsonType : Decimal128 |
Null | String with null value | bsonType : Null |
ObjectId | String containing the 24-character hexadecimal value of the Object Id |
|
Symbol | String | bsonType : Symbol |
Undefined | String with null value | bsonType : Undefined |
Configuring a MongoDB Atlas CDC Origin
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the Connection tab, configure the following
properties:
Connection Property Description Connection String Connection string for MongoDB. To connect to MongoDB Atlas or Enterprise Server, you can use the following DNS seed list format:mongodb+srv://server.example.com/
To connect to a MongoDB Enterprise Server cluster, use the following standard connection format:
mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
When connecting to a cluster, enter additional node information to ensure a connection.
For more information about MongoDB connection strings, see the MongoDB documentation.
SSL/TLS Mode Method used to implement SSL/TLS: - None - Connects to a MongoDB Enterprise Server cluster that is not enabled to use SSL/TLS.
- Atlas/System CA - Connects to a MongoDB Atlas cluster. You can also use this when your certificates or keys have already been specified at the JVM level.
- Server Validation (1 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and does not need to prove client identity.
- Server and Client Validation (2 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and the server also validates the client key. This occurs when the cluster is set up to require client certificates.
SSL Invalid Host Name Allowed Specifies whether invalid host names are allowed in SSL/TLS certificates. Available when using server validation or server and client validation.
Certificate Mode Mode to provide the SSL/TLS certificate: - File - Use when the certificate is in a file local to Data Collector.
- Embedded - Use to provide the certificate text directly in stage properties.
Available when using server validation or server and client validation.
Certificate Authority MongoDB certificate to use. Define this property based on the configured certificate mode: - When using file certificate mode, specify a path to the
certificate. Enter an absolute path to the file or enter the
following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/keystore.jks
- When using the embedded certificate
mode, provide the full text of the certificate to use. The
text should start with
---BEGIN CERTIFICATE---
.
Available when using server validation or server and client validation.
Certificate Authority Password Password for the certificate. Specify if the certificate file is encrypted. Available when using server validation or server and client validation, and when using file certificate mode.
Client Certificate Client certificate to use. Define this property based on the configured certificate mode: - When using file certificate mode, specify a path to the
certificate. Enter an absolute path to the file or enter the
following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/keystore.jks
- When using the embedded
certificate mode, provide the full text of the certificate
to use. The text should start with
---BEGIN CERTIFICATE---
.
Available when using server and client validation.
Client Private Key Path to the key file. Available when using server and client validation and file certificate mode.
Private Key Password Password for the private key. Specify if the private key is encrypted. Available when using server and client validation and file certificate mode.
-
On the Credentials tab, configure the following
properties:
Credentials Property Description Authentication Method Authentication method to use: - None
- Username / Password
- LDAP
Username User name for the selected authentication method. Password Password for the specified user name. Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Authentication Database Database name associated with the specified user account. Available when using username/password authentication.
Authentication Mechanism Authentication mechanism to use: - Default - Data Collector and MongoDB negotiate to choose the encryption mechanism.
- SCRAM-SHA-1 - Data Collector sends SCRAM-SHA-1 credentials to MongoDB.
- SCRAM-SHA-256 - Data Collector sends SCRAM-SHA-1 credentials to MongoDB.
-
On the MongoDB tab, configure the following
properties:
MongoDB Property Description Read Changes From Where to read data changes from: - Change Stream - Reads entries from a MongoDB change stream. For information about using change streams, see the MongoDB documentation.
- Oplog Collection - Reads entries from the MongoDB oplog.
Initial Offset Initial offset to use to begin reading. When using a date or Object ID field as the offset field, enter a timestamp with the following format or a hexadecimal string representation of the Object ID: YYYY-MM-DD hh:mm:ss
When using a string field, enter the string to use.
Include Namespace Read changes only to specified collections. If no collections are added, the origin reads all changes. Using simple or bulk edit mode, click the Add icon to add a collection, and enter the namespace of the collection with the following format:
<database>.<collection>
Operations Types Operation types to read. Choose from the following data operations:- INSERT
- UPDATE
- DELETE
Read Preference Determines how the origin reads data from different members of the MongoDB replica set. Get full record for updates Retrieves the full record for updates. When disabled, updates pass only the updated fields. Auto Flatten Nested Structures Flattens fields with nested fields. Includes the path to the field in the field name, such as: <firstlevel>.<secondlevel>.<fieldname>
Flattened arrays include the index in the field name, as follows:
root.array[0].field1
Batch Size (records) Maximum number of records allowed in a batch. Max Batch Wait Time Maximum seconds the origin waits for a batch before sending an empty batch. Used only when the Capped Collection Cursor Type property is set to Tailable.
-
Optionally, click the Advanced tab to configure how the
origin connects to MongoDB.
The defaults for these properties should work in most cases. If a numeric property is set to 0, then the driver default value is used.
Advanced Property Description Compression Algorithm Compression algorithm to use to communicate with MongoDB: - None
- Snappy
- ZLib
- ZStandard
These compression algorithms are not supported by all MongoDB versions. See the MongoDB documentation for details.
Default is Snappy.
Application Name Name to use in MongoDB reporting, such as server logs. Maximum Connections Maximum number of open connections allowed in the connection pool. Minimum Connections Minimum number of open connections allowed in the connection pool. Max Connection Idle Time Maximum idle time in milliseconds before a connection is removed from the connection pool. Max Connection Lifetime Maximum lifetime in milliseconds for a connection in the connection pool. Max Connection Wait Time Maximum time in milliseconds that a connection waits to connect. Socket Connect Timeout Maximum time in milliseconds to wait for a network socket connection. Socket Read Timeout Maximum time in milliseconds to wait for a read connection. Socket Receive Buffer Size (bytes) Buffer size in bytes for receiving data. Socket Send Buffer Size (bytes) Buffer size in bytes for sending data. Heartbeat Frequency Milliseconds between Data Collector attempts to determine the current state of each server in the cluster. Min Heartbeat Frequency Minimum number of milliseconds between Data Collector checks on the state of each server. Server Selection Timeout Maximum time in milliseconds that Data Collector waits for server selection before throwing an exception. If you enter 0, an exception is thrown immediately if no server is available. Use a negative value to wait indefinitely. Local Threshold Local threshold in milliseconds. Requests are sent to a server whose ping time is less than or equal to the server with the fastest ping time plus the local threshold value. Required Replica Set Name Required replica set name to use for the cluster. Enable Single Mode Connects to the first MongoDB server in the connection string. Applicable only for MongoDB Enterprise Server clusters.
Max Number of Retries Maximum number of times to retry the connection when the connection fails. Default is 10.
Retry Interval (ms) Time between retries in milliseconds. Default is 10,000.
Capped Collection Cursor Type Style of cursor to use for a capped collection: - Normal
- Tailable
- Tailable Await