MongoDB Atlas CDC

Supported pipeline types:
  • Data Collector

The MongoDB Atlas CDC origin reads changes from a MongoDB Atlas change stream or oplog. For information about supported versions, see Supported Systems and Versions.

The MongoDB Atlas CDC origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

When you configure the origin, you define connection information, such as the connection string and credentials to use. You can specify SSL/TLS properties for an SSL/TLS-enabled MongoDB cluster.

You can configure where the origin reads changes from, initial offset, and read preference. You can define a custom filter and configure the origin to flatten nested structures.

You can optionally configure advanced options that determine how the origin connects to MongoDB, such as the maximum number of open connections to allow in the connection pool and the cursor type to use for capped connections.

When the pipeline stops, the origin notes where it stops reading. When the pipeline starts again, the origin continues processing from the last-saved offset by default. You can reset the origin to process all available data.

Credentials

Based on the authentication used by MongoDB, configure the MongoDB Atlas CDC origin to use no authentication, username/password authentication, or LDAP authentication. By default, no authentication is used.

To use username/password or LDAP authentication, enter the required credentials in one of the following ways:
Authentication method
Specify the authentication to use with the Authentication Method property on the Credentials tab:
  • None
  • Username / Password
  • LDAP
Then, define the username and password for username/password or LDAP authentication.
When using username/password authentication, you also specify the authentication mechanism to use. You can also specify an authentication database.
Connection string
If you prefer, you can specify credentials in the connection string on the Connection tab. However, specifying credentials on the Credentials tab is the recommended method.
To enter credentials for username/password authentication for self-managed clusters, enter the username and password before the host name. Use the following format:
mongodb://username:password@host[:port][/[database][?options]]
To enter credentials for MongoDB Atlas, specify the URL from your Atlas cluster settings.

Read Preferences

You can configure the read preference that the MongoDB Atlas CDC origin uses. The read preference determines how the origin reads data from different members of the MongoDB replica set.

You can use the following MongoDB read preferences:
  • Primary - Requires reading from the primary member.
  • Primary Preferred - Prefers reading from the primary, but allows reads from a secondary member.
  • Secondary - Requires reading from a secondary member.
  • Secondary Preferred - Prefers reading from a secondary, but allows reads from a primary when necessary.
  • Nearest - Reads from the member with the least network latency.

By default, the origin uses Secondary Preferred to avoid making unnecessary requests to the primary member.

Generated Records

The MongoDB Atlas CDC origin generates records based on data from a MongoDB change stream or the MongoDB oplog and adds CRUD and CDC related record header attributes.

The structure of oplog records is unique, so when necessary, you might use processors in the pipeline to convert record structure.

For example, for insert records, record data resides in a map field named o. But for an update record, the _id field is part of the o2 map field. To merge the record data, you can use a Field Flattener to flatten the map fields and a Field Remover to remove any unnecessary fields.

For more information about the oplog record structure, see the MongoDB documentation. The following site is also a good resource: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/.

CRUD Operation and CDC Header Attributes

The MongoDB Atlas CDC origin includes the CRUD operation type in the sdc.operation.type record header attribute.

If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Elasticsearch, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator processor or any scripting processor to manipulate the value in the header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

The MongoDB Atlas CDC origin uses the following values in the sdc.operation.type record header attribute to represent the operation type:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 5 for unsupported operations, such as CMD, NOOP, or DB, which are available MongoDB operation types but not applicable to record data.
The MongoDB Atlas CDC origin can also include the following header attributes when the information is relevant:
  • op - The CRUD operation using the following values:
    • i for INSERT
    • u for UPDATE
    • d for DELETE
  • ns - The namespace, using the following format: <database>:<collection>.

Enabling SSL/TLS

By default, the MongoDB Atlas CDC origin does not use SSL/TLS. If the cluster is enabled to use SSL/TLS, then you can connect using one of the following methods:
  • Atlas/System CA - Connects to a MongoDB Atlas cluster. You can also use this when your certificates or keys have already been specified at the JVM level.
  • Server Validation (1 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and does not need to prove client identity.
  • Server and Client Validation (2 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and the server also validates the client key. This occurs when the cluster is set up to require client certificates.
Note: Server validation and server and client validation require configuring additional properties that provide the required information. Both options require obtaining the certificate file for the cluster in one of the valid formats. Server and client validation also requires generating or obtaining the public certificate and private key file for Data Collector.
You can specify certificates and keys in the following formats:
  • JKS (Java Keystore)
  • PEM (text-based)
  • DER (text-based)
  • PKCS #7 / P7B
  • PKCS #12 / P12 / PFX
  • Private keys inside PEM, DER, or PKCS #12 encoded as PKCS#1 or PKCS#8

If the files are in PEM or DER plain text format, you can provide the text in the stage properties. The certificate should begin and end with text such as: —BEGIN CERTIFICATE— or —END PRIVATE KEY—. Otherwise, you provide a path to the certificate file.

MongoDB Data Types

When the MongoDB Atlas CDC origin reads from MongoDB, it converts standard MongoDB data types to the following Data Collector data types.

The origin can also convert supported BSON types to Data Collector data types. For more information, see Reading BSON Types.

The following table describes how standard MongoDB data types are converted to Data Collector types:
Standard MongoDB Type Data Collector Type
Array List
Binary Byte Array
Boolean Boolean
Date Date
Double Double
Int32 Integer
Int64 Long
JavaScript String
Object List-Map
String String
Timestamp Datetime

Reading BSON Types

When reading from MongoDB, the MongoDB Atlas origin converts standard MongoDB data types to Data Collector data types as described in MongoDB Data Types.

The origin converts supported BSON data types to Data Collector data types as well. When converting BSON data types, the origin adds a field attribute named bsonType to the converted field.

Some supported BSON data types encode additional information with the data. Where this occurs, the information is included as additional attributes for the field. For example, a BsonTimestamp can encode an ordinal value along with the date and time. When the origin reads the data, it converts the field to a Datetime field with an ordinal field attribute set to the ordinal value encoded with the data.

The following table lists supported MongoDB BSON data types, the Data Collector types they convert to, and any additional field attributes included with the conversion:
BSON Data Type Data Collector Type Field Attributes and Values
Binary Byte Array bsonType: Binary
BsonDbPointer Map field with the following subfields:
  • database
  • collection
  • id - String containing the 24-character hexadecimal value of the ID
bsonType: Bson_Db_Pointer
BsonRegularExpression String
  • bsonType: Bson_Regular_Expression
  • options: Options provided with the regular expression
BsonTimestamp Datetime
  • bsonType: Bson_Timestamp2
  • seconds: Unix epoch time in seconds
  • ordinal: Ordinal value encoded in the timestamp

Code String bsonType: Code
CodeWithScope String bsonType: Code_With_Scope
DBRef Map field with the following subfields:
  • database
  • collection
  • id - String containing the 24-character hexadecimal value of the ID
bsonType: Db_Ref
Decimal128 Decimal bsonType: Decimal128
Null String with null value bsonType: Null
ObjectId String containing the 24-character hexadecimal value of the Object Id
  • bsonType: Object_Id
  • timestamp: Unix epoch time of the Object Id
  • date: Date represented by the Object Id in yyyy-MM-dd HH:mm:ss format

Symbol String bsonType: Symbol
Undefined String with null value bsonType: Undefined
Note: The MinKey and MaxKey BSON data types are not supported at this time.

Configuring a MongoDB Atlas CDC Origin

Configure a MongoDB Atlas CDC origin to read changes from a MongoDB Atlas change stream or oplog.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Connection tab, configure the following properties:
    Connection Property Description
    Connection String
    Connection string for MongoDB. To connect to MongoDB Atlas or Enterprise Server, you can use the following DNS seed list format:
    mongodb+srv://server.example.com/

    To connect to a MongoDB Enterprise Server cluster, use the following standard connection format:

    mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

    When connecting to a cluster, enter additional node information to ensure a connection.

    For more information about MongoDB connection strings, see the MongoDB documentation.

    SSL/TLS Mode Method used to implement SSL/TLS:
    • None - Connects to a MongoDB Enterprise Server cluster that is not enabled to use SSL/TLS.
    • Atlas/System CA - Connects to a MongoDB Atlas cluster. You can also use this when your certificates or keys have already been specified at the JVM level.
    • Server Validation (1 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and does not need to prove client identity.
    • Server and Client Validation (2 Way TLS) - Connects to an SSL/TLS-enabled MongoDB Enterprise Server cluster when the client needs to validate the server certificate and the server also validates the client key. This occurs when the cluster is set up to require client certificates.
    SSL Invalid Host Name Allowed Specifies whether invalid host names are allowed in SSL/TLS certificates.

    Available when using server validation or server and client validation.

    Certificate Mode Mode to provide the SSL/TLS certificate:
    • File - Use when the certificate is in a file local to Data Collector.
    • Embedded - Use to provide the certificate text directly in stage properties.

    Available when using server validation or server and client validation.

    Certificate Authority MongoDB certificate to use. Define this property based on the configured certificate mode:
    • When using file certificate mode, specify a path to the certificate. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

      ${runtime:resourcesDirPath()}/keystore.jks

    • When using the embedded certificate mode, provide the full text of the certificate to use. The text should start with ---BEGIN CERTIFICATE---.

    Available when using server validation or server and client validation.

    Certificate Authority Password Password for the certificate. Specify if the certificate file is encrypted.

    Available when using server validation or server and client validation, and when using file certificate mode.

    Client Certificate Client certificate to use. Define this property based on the configured certificate mode:
    • When using file certificate mode, specify a path to the certificate. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

      ${runtime:resourcesDirPath()}/keystore.jks

    • When using the embedded certificate mode, provide the full text of the certificate to use. The text should start with ---BEGIN CERTIFICATE---.

    Available when using server and client validation.

    Client Private Key Path to the key file.

    Available when using server and client validation and file certificate mode.

    Private Key Password Password for the private key. Specify if the private key is encrypted.

    Available when using server and client validation and file certificate mode.

  3. On the Credentials tab, configure the following properties:
    Credentials Property Description
    Authentication Method Authentication method to use:
    • None
    • Username / Password
    • LDAP
    Username User name for the selected authentication method.
    Password Password for the specified user name.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Authentication Database Database name associated with the specified user account.

    Available when using username/password authentication.

    Authentication Mechanism Authentication mechanism to use:
    • Default - Data Collector and MongoDB negotiate to choose the encryption mechanism.
    • SCRAM-SHA-1 - Data Collector sends SCRAM-SHA-1 credentials to MongoDB.
    • SCRAM-SHA-256 - Data Collector sends SCRAM-SHA-1 credentials to MongoDB.
  4. On the MongoDB tab, configure the following properties:
    MongoDB Property Description
    Read Changes From Where to read data changes from:
    • Change Stream - Reads entries from a MongoDB change stream. For information about using change streams, see the MongoDB documentation.
    • Oplog Collection - Reads entries from the MongoDB oplog.
    Initial Offset Initial offset to use to begin reading. When using a date or Object ID field as the offset field, enter a timestamp with the following format or a hexadecimal string representation of the Object ID:

    YYYY-MM-DD hh:mm:ss

    When using a string field, enter the string to use.

    Include Namespace Read changes only to specified collections. If no collections are added, the origin reads all changes.

    Using simple or bulk edit mode, click the Add icon to add a collection, and enter the namespace of the collection with the following format:

    <database>.<collection>

    Operations Types Operation types to read.
    Choose from the following data operations:
    • INSERT
    • UPDATE
    • DELETE
    Read Preference Determines how the origin reads data from different members of the MongoDB replica set.
    Get full record for updates Retrieves the full record for updates. When disabled, updates pass only the updated fields.
    Auto Flatten Nested Structures Flattens fields with nested fields. Includes the path to the field in the field name, such as:

    <firstlevel>.<secondlevel>.<fieldname>

    Flattened arrays include the index in the field name, as follows:

    root.array[0].field1

    Batch Size (records) Maximum number of records allowed in a batch.
    Max Batch Wait Time Maximum seconds the origin waits for a batch before sending an empty batch.

    Used only when the Capped Collection Cursor Type property is set to Tailable.

  5. Optionally, click the Advanced tab to configure how the origin connects to MongoDB.

    The defaults for these properties should work in most cases. If a numeric property is set to 0, then the driver default value is used.

    Advanced Property Description
    Compression Algorithm Compression algorithm to use to communicate with MongoDB:
    • None
    • Snappy
    • ZLib
    • ZStandard

    These compression algorithms are not supported by all MongoDB versions. See the MongoDB documentation for details.

    Default is Snappy.

    Application Name Name to use in MongoDB reporting, such as server logs.
    Maximum Connections Maximum number of open connections allowed in the connection pool.
    Minimum Connections Minimum number of open connections allowed in the connection pool.
    Max Connection Idle Time Maximum idle time in milliseconds before a connection is removed from the connection pool.
    Max Connection Lifetime Maximum lifetime in milliseconds for a connection in the connection pool.
    Max Connection Wait Time Maximum time in milliseconds that a connection waits to connect.
    Socket Connect Timeout Maximum time in milliseconds to wait for a network socket connection.
    Socket Read Timeout Maximum time in milliseconds to wait for a read connection.
    Socket Receive Buffer Size (bytes) Buffer size in bytes for receiving data.
    Socket Send Buffer Size (bytes) Buffer size in bytes for sending data.
    Heartbeat Frequency Milliseconds between Data Collector attempts to determine the current state of each server in the cluster.
    Min Heartbeat Frequency Minimum number of milliseconds between Data Collector checks on the state of each server.
    Server Selection Timeout Maximum time in milliseconds that Data Collector waits for server selection before throwing an exception. If you enter 0, an exception is thrown immediately if no server is available. Use a negative value to wait indefinitely.
    Local Threshold Local threshold in milliseconds. Requests are sent to a server whose ping time is less than or equal to the server with the fastest ping time plus the local threshold value.
    Required Replica Set Name Required replica set name to use for the cluster.
    Enable Single Mode Connects to the first MongoDB server in the connection string.

    Applicable only for MongoDB Enterprise Server clusters.

    Max Number of Retries Maximum number of times to retry the connection when the connection fails.

    Default is 10.

    Retry Interval (ms) Time between retries in milliseconds.

    Default is 10,000.

    Capped Collection Cursor Type Style of cursor to use for a capped collection:
    • Normal
    • Tailable
    • Tailable Await