MongoDB Oplog

Supported pipeline types:
  • Data Collector

The MongoDB Oplog origin reads entries from MongoDB Oplog. For information about supported versions, see Supported Systems and Versions.

MongoDB stores information about changes to the database in a local capped collection called an Oplog. The Oplog contains information about changes in data as well as changes in the database. The MongoDB Oplog origin can read any operation written to the Oplog.

Use the MongoDB Oplog origin to capture changes in data or the database. To read data from a MongoDB collection, use the MongoDB origin.To read from MongoDB Atlas, use the MongoDB Atlas origin.

The MongoDB Oplog origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

When you configure the MongoDB Oplog origin, you configure connection information, such as the connection string and MongoDB credentials. You define an optional timestamp and ordinal to specify where to start the read, the operations that you want to process, and the read preference.

You can optionally configure advanced options that determine how the origin connects to MongoDB, including enabling SSL/TLS.

When a pipeline stops, the MongoDB Oplog origin notes where it stops reading. When the pipeline starts again, the origin continues processing from where it stopped by default. You can reset the origin to process all requested data.

Credentials

Based on the authentication used by the MongoDB server, configure the stage to use no authentication, username/password authentication, or LDAP authentication. When using username/password authentication, you can also use delegated authentication. When using LDAP authentication, you can use server-driven authentication or plain authentication.

By default, the origin uses no authentication.

To use username/password or LDAP authentication, enter the required credentials in one of the following ways:
Connection string
Enter credentials in the connection string on the MongoDB tab.
To enter credentials for username/password authentication, enter the username and password before the host name. Use the following format:
mongodb://username:password@host[:port][/[database][?options]]
To enter credentials for LDAP authentication, enter the username and password before the host name, and set the authMechanism option to PLAIN. Use the following format:
mongodb://username:password@host[:port][/[database]?authMechanism=PLAIN
Credentials tab
Select either the Username/Password or LDAP authentication type on the Credentials tab. When using LDAP authentication, you also choose between server-driven or plain authentication.
Then, you specify the username and password for the authentication type.

Oplog Timestamp and Ordinal

When you start the pipeline, the MongoDB Oplog origin starts reading from the beginning of the Oplog by default. You can configure a timestamp and ordinal to specify where you want to start the processing.

MongoDB Oplog entries include a timestamp field named "ts" that consists of a timestamp and ordinal as follows:
"ts": Timestamp(<timestamp>, <ordinal>)

The timestamp format is the seconds since the Unix epoch, such as 1412180887.

The ordinal is an integer counter used to differentiate between entries that occur in the same second.

You can use a timestamp and ordinal to specify where to begin reading from the Oplog. When you use a timestamp, you must also define an ordinal.

For more information about the Oplog timestamp field, see the MongoDB documentation.

Read Preference

You can configure the read preference that the MongoDB Oplog origin uses. The read preference determines how the origin reads data from different members of the MongoDB replica set.

You can use the following MongoDB read preferences:
  • Primary - Requires reading from the primary member.
  • Primary Preferred - Prefers reading from the primary, but allows reads from a secondary member.
  • Secondary - Requires reading from a secondary member.
  • Secondary Preferred - Prefers reading from a secondary, but allows reads from a primary when necessary.
  • Nearest - Reads from the member with the least network latency.

By default, the origin uses Secondary Preferred to avoid making unnecessary requests to the primary member.

Generated Records

The MongoDB Oplog origin generates records based on data from the MongoDB Oplog and adds CRUD and CDC related record header attributes.

The structure of Oplog records is unique, so when necessary, you might use some processors in the pipeline to convert record structure.

For example, for insert records, record data resides in a map field named "o". But for an update record, the _id field is part of the o2 map field. To merge the record data, you can use a Field Flattener to flatten the map fields and a Field Remover to remove any unnecessary fields.

For more information about the Oplog record structure, see the MongoDB documentation. The following site is also a good resource: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/.

CRUD Operation and CDC Header Attributes

The MongoDB Oplog origin includes the CRUD operation type in the sdc.operation.type record header attribute.

If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Elasticsearch, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator processor or any scripting processor to manipulate the value in the header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

The MongoDB Oplog origin uses the following values in the sdc.operation.type record header attribute to represent the operation type:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 5 for unsupported operations, such as CMD, NOOP, or DB, which are available MongoDB operation types but not applicable to record data.
The MongoDB Oplog origin can also include the following header attributes when the information is relevant:
  • op - The CRUD operation using the following values:
    • i for INSERT
    • u for UPDATE
    • d for DELETE
  • ns - The namespace, using the following format: <database>:<tablename>.

Enabling SSL/TLS

You can enable the MongoDB Oplog origin to use SSL/TLS to connect to MongoDB.

  1. On the Advanced tab for the stage, select the SSL Enabled property.
  2. If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, create a custom truststore file or modify a copy of the default Java truststore file to add the CA to the file. Then configure Data Collector to use the modified truststore file.

    By default, Data Collector uses the Java truststore file located in $JAVA_HOME/jre/lib/security/cacerts. If your certificate is signed by a CA that is included in the default Java truststore file, you do not need to create a truststore file and can skip this step.

    In these steps, we show how to modify the default truststore file to add an additional CA to the list of trusted CAs. If you prefer to create a custom truststore file, see the keytool documentation.
    Note: If Data Collector is already configured to use a custom truststore file to enable HTTPS or for secure connections to an LDAP server, then simply add this additional CA to the same modified truststore file.
    1. Use the following command to set the JAVA_HOME environment variable:
      export JAVA_HOME=<Java home directory>
    2. Use the following command to set the SDC_CONF environment variable:
      export SDC_CONF=<Data Collector configuration directory>
      For example, for an RPM installation use:
      export SDC_CONF=/etc/sdc
    3. Use the following command to copy the default Java truststore file to the Data Collector configuration directory:
      cp "${JAVA_HOME}/jre/lib/security/cacerts" "${SDC_CONF}/truststore.jks"
    4. Use the following keytool command to import the CA certificate into the truststore file:
      keytool -import -file <certificate> -trustcacerts -noprompt -alias <alias> -storepass <password> -keystore "${SDC_CONF}/truststore.jks"
    5. Define the following options in the SDC_JAVA_OPTS environment variable:
      • javax.net.ssl.trustStore - Path to the truststore file on the Data Collector machine.
      • javax.net.ssl.trustStorePassword - Truststore password.

      Modify environment variables using the method required by your installation type.

      For example, define the options as follows:
      export SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Djavax.net.ssl.trustStore=/etc/sdc/truststore.jks -Djavax.net.ssl.trustStorePassword=mypassword -Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow"

      Or to avoid saving the password in the export command, save the password in a text file and then define the truststore password option as follows: -Djavax.net.ssl.trustStorePassword=$(cat passwordfile.txt)

      Then ensure that the password file is readable only by the user executing the export command.

    6. Restart Data Collector to enable the changes.

Configuring a MongoDB Oplog Origin

Configure a MongoDB Oplog origin to read data from a MongoDB Oplog.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the MongoDB tab, configure the following properties:
    MongoDB Property Description
    Connection String
    Connection string for the MongoDB instance. Use the following format:
    mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
    When connecting to a cluster, enter additional node information to ensure a connection.

    If the MongoDB server uses username/password or LDAP authentication, you can include the credentials in the connection string, as described in Credentials.

    Enable Single Mode Select to connect to a single MongoDB server or node. If multiple nodes are defined in the connection string, the stage connects only to the first node.

    Use this option with care. If the stage cannot connect or the connection fails, the pipeline stops.

    Collection Name of the MongoDB Oplog collection to use, typically oplog.rs.

    The collection name must start with "oplog".

    Initial Timestamp Timestamp in the Oplog ts timestamp field to begin reading data.

    Use the MongoDB timestamp format, the seconds since the Unix epoch. For more information about the Oplog timestamp, see the MongoDB documentation.

    Use -1 to opt out of this property and read from the beginning of the Oplog.

    Ordinal Integer ordinal to specify the entry to use within a set of identical timestamps.

    Required when you specify an initial timestamp. Use -1 to opt out of this property.

    Operation Types Operations to process. The origin reads the selected operations. Choose from the following data operations:
    • INSERT
    • UPDATE
    • DELETE
    You can also choose from the following non-data operations:
    • NOOP
    • CMD
    • DB

    For more information about the Oplog operation types, see the MongoDB documentation.

    Batch Size (records) Maximum number of records allowed in a batch.
    Max Batch Wait Time Amount of time the origin will wait to fill a batch before sending an empty batch.
    Read Preference Determines how the origin reads data from different members of the MongoDB replica set.
  3. To enter credentials separately from the MongoDB connection string, click the Credentials tab and configure the following properties:
    Credentials Description
    Authentication Type Authentication used by the MongoDB server: Username/Password or LDAP.
    Authentication Mechanism LDAP authentication method: server-driven or plain authentication.
    Username MongoDB or LDAP user name.
    Password MongoDB or LDAP password.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Authentication Source An optional alternate database name to perform delegated authentication.

    Available for the Username/Password option.

  4. Optionally, click the Advanced tab to configure how the origin connects to MongoDB.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Connections Per Host Maximum number of connections for each host.

    Default is 100.

    Min Connections Per Host Minimum number of connections for each host.

    Default is 0.

    Connection Timeout Maximum time in milliseconds to wait for a connection.

    Default is 10,000.

    Max Connection Idle Time Maximum time in milliseconds that a pooled connection can remain idle. When a pooled connection exceeds the idle time, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Connection Lifetime Maximum time in milliseconds that a pooled connection can be active. When a pooled connection exceeds the lifetime, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Wait Time Maximum time in milliseconds that a thread can wait for a connection to become available. Use 0 to opt out of this property. Use a negative value to wait indefinitely.

    Default is 120,000.

    Server Selection Timeout Maximum time in milliseconds that Data Collector waits for a server selection before throwing an exception. If you use 0, an exception is thrown immediately if no server is available. Use a negative value to wait indefinitely.

    Default is 30,000.

    Threads Allowed to Block for Connection Multiplier Multiplier that determines the maximum number of threads that can wait for a connection to become available from the pool. This number multiplied by the Connections Per Host value determines the maximum number of threads.

    Default is 5.

    Heartbeat Frequency The frequency in milliseconds at which Data Collector attempts to determine the current state of each server in the cluster.

    Default is 10,000.

    Min Heartbeat Frequency Minimum heartbeat frequency in milliseconds. Data Collector waits at least this long before checking the state of each server.

    Default is 500.

    Heartbeat Connection Timeout Maximum time in milliseconds to wait for a connection used for the cluster heartbeat.

    Default is 20,000.

    Heartbeat Socket Timeout Maximum time in milliseconds for a socket timeout for connections used for the cluster heartbeat.

    Default is 20,000.

    Local Threshold Local threshold in milliseconds. Requests are sent to a server whose ping time is less than or equal to the server with the fastest ping time plus the local threshold value.

    Default is 15.

    Required Replica Set Name Required replica set name to use for the cluster.
    Cursor Finalizer Enabled Specifies whether to enable cursor finalizers.
    Socket Keep Alive Specifies whether to enable socket keep alive.
    Socket Timeout Maximum time in milliseconds for the socket timeout. Use 0 to opt out of this property.

    Default is 0.

    SSL Enabled Enables SSL/TLS.

    If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, you also must define the truststore file and password in the SDC_JAVA_OPTS environment variable, as described in Enabling SSL/TLS.

    SSL Invalid Host Name Allowed Specifies whether invalid host names are allowed in SSL/TLS certificates.