MongoDB

The MongoDB origin reads data from MongoDB. For information about supported versions, see Supported Systems and Versions.

The MongoDB origin reads from MongoDB and generates a record for every MongoDB document. To read from MongoDB Atlas, use the MongoDB Atlas origin. To read change data capture information from the MongoDB Oplog, use the MongoDB Oplog origin.

The MongoDB origin reads from capped and uncapped collections. When you configure MongoDB, you define connection information, such as the connection string and MongoDB credentials. You can also use a connection to configure the origin. You configure the offset field, collection type, and initial offset. These properties determine how the origin queries the database.

When the pipeline stops, the MongoDB origin notes where it stops reading. When the pipeline starts again, the origin continues processing from the last-saved offset by default. You can reset the origin to process all requested files.

You can optionally configure advanced options that determine how the origin connects to MongoDB, including enabling SSL/TLS for the origin.

The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Credentials

Based on the authentication used by the MongoDB server, configure the stage to use no authentication, username/password authentication, or LDAP authentication. When using username/password authentication, you can also use delegated authentication. When using LDAP authentication, you can use server-driven authentication or plain authentication.

By default, the origin uses no authentication.

To use username/password or LDAP authentication, enter the required credentials in one of the following ways:
Connection string
Enter credentials in the connection string on the MongoDB tab.
To enter credentials for username/password authentication, enter the username and password before the host name. Use the following format:
mongodb://username:password@host[:port][/[database][?options]]
To enter credentials for LDAP authentication, enter the username and password before the host name, and set the authMechanism option to PLAIN. Use the following format:
mongodb://username:password@host[:port][/[database]?authMechanism=PLAIN
Credentials tab
Select either the Username/Password or LDAP authentication type on the Credentials tab. When using LDAP authentication, you also choose between server-driven or plain authentication.
Then, you specify the username and password for the authentication type.

Offset Field and Initial Offset

MongoDB uses the offset field to track the data to read. By default, the MongoDB origin uses the _id field as the offset field.

You can use a nested offset field, such as o._id. Or, you can use any Object ID, date, or string field as the offset field. The results for using any field besides the default _id field are not guaranteed.

When you use a date or Object ID field, specify a timestamp to use as the initial offset. Object ID fields include an embedded timestamp that the origin uses to determine where in the collection to begin reading. When you define the initial offset for a date or Object ID field, use the following format:
YYYY-MM-DD HH:mm:ss

When you use a string field, specify the initial string to use as the initial offset.

Note: If you change the offset field type for the origin after the pipeline runs and then stops, you must reset the origin before you can run the pipeline again.

Read Preference

You can configure the read preference that the MongoDB origin uses. The read preference determines how the origin reads data from different members of the MongoDB replica set.

You can use the following MongoDB read preferences:
  • Primary - Requires reading from the primary member.
  • Primary Preferred - Prefers reading from the primary, but allows reads from a secondary member.
  • Secondary - Requires reading from a secondary member.
  • Secondary Preferred - Prefers reading from a secondary, but allows reads from a primary when necessary.
  • Nearest - Reads from the member with the least network latency.

By default, the origin uses Secondary Preferred to avoid making unnecessary requests to the primary member.

Event Generation

The MongoDB origin can generate events when it completes processing all available data and the configured batch wait time has elapsed.

MongoDB origin events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin continues processing from the last-saved offset unless you reset the origin.

    For an example, see Stopping a Pipeline After Processing All Available Data.

  • With a destination to store event information.

    For an example, see Preserving an Audit Trail of Events.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Records

Event records generated by the MongoDB origin have the following event-related record header attributes. Record header attributes are stored as String values:
Record Header Attribute Description
sdc.event.type Event type. Uses the following event type:
  • no-more-data - Generated after the origin completes processing all available objects and the number of seconds configured for Max Batch Wait Time has elapsed.

    The origin must create at least one record for the pipeline before generating the event record.

sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.

The MongoDB origin can generate the following event record:

no-more-data
The MongoDB origin generates a no-more-data event record when the origin completes processing all available records and the number of seconds configured for Max Batch Wait Time elapses without any new objects appearing to be processed.

If no data is available for processing when the pipeline starts, the origin generates the record only after creating at least one record for the pipeline.

No-more-data event records generated by the origin have the sdc.event.type set to no-more-data and include the following fields:
Event Record Field Description
record-count Number of records successfully generated since the pipeline started or since the last no-more-data event was created.
error-count Number of error records generated since the pipeline started or since the last no-more-data event was created.

BSON Timestamp

When processing data from MongoDB version 2.6 and later, the MongoDB origin supports the MongoDB BSON Timestamp data type.

The MongoDB BSON Timestamp is a MongoDB data type that includes a timestamp and ordinal as follows:
<BSON Timestamp field name>:Timestamp(<timestamp>, <ordinal>)

The MongoDB origin converts the BSON Timestamp to a map as follows:

<BSON Timestamp field name>{MAP}:
    Timestamp{DATETIME}:<UTC timestamp>
    Ordinal{INTEGER}:<integer ordinal>
For example, a Transaction BSON timestamp of (1485449409, 1), is converted to the following Transaction map field:
"Transaction":{
    "Timestamp":Jan 26, 2016 14:50:09PM
    "Ordinal":1
}

Enabling SSL/TLS

You can enable the MongoDB origin to use SSL/TLS to connect to MongoDB.

  1. On the Advanced tab for the stage, select the SSL Enabled property.
  2. If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, create a custom truststore file or modify a copy of the default Java truststore file to add the CA to the file. Then configure Data Collector to use the modified truststore file.

    By default, Data Collector uses the Java truststore file located in $JAVA_HOME/jre/lib/security/cacerts. If your certificate is signed by a CA that is included in the default Java truststore file, you do not need to create a truststore file and can skip this step.

    In these steps, we show how to modify the default truststore file to add an additional CA to the list of trusted CAs. If you prefer to create a custom truststore file, see the keytool documentation.

    1. Use the following command to set the JAVA_HOME environment variable:
      export JAVA_HOME=<Java home directory>
    2. Use the following command to set the SDC_CONF environment variable:
      export SDC_CONF=<Data Collector configuration directory>
      For example:
      export SDC_CONF=/streamsets-datacollector-5.6.0/etc
    3. Use the following command to copy the default Java truststore file to the Data Collector configuration directory:
      cp "${JAVA_HOME}/jre/lib/security/cacerts" "${SDC_CONF}/truststore.jks"
    4. Use the following keytool command to import the CA certificate into the truststore file:
      keytool -import -file <certificate> -trustcacerts -noprompt -alias <alias> -storepass <password> -keystore "${SDC_CONF}/truststore.jks"
    5. In Control Hub, edit the deployment. In the Configure Engine section, click Advanced Configuration. Then, click Java Configuration. Define the following options in the Java Options property:
      • javax.net.ssl.trustStore - Path to the truststore file on the Data Collector machine.
      • javax.net.ssl.trustStorePassword - Truststore password.
      For example, define the options as follows:
      -Djavax.net.ssl.trustStore=/streamsets-datacollector-5.6.0/etc/truststore.jks -Djavax.net.ssl.trustStorePassword=mypassword
    6. Save the changes to the deployment and restart all engine instances.

Configuring a MongoDB Origin

Configure a MongoDB origin to read data from MongoDB.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the MongoDB tab, configure the following properties:
    MongoDB Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Connection String

    Connection string for the MongoDB instance. Use the following format:
    mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
    When connecting to a cluster, enter additional node information to ensure a connection.
    If the MongoDB server uses username/password or LDAP authentication, you can include the credentials in the connection string, as described in Credentials.
    Enable Single Mode Select to connect to a single MongoDB server or node. If multiple nodes are defined in the connection string, the stage connects only to the first node.

    Use this option with care. If the stage cannot connect or the connection fails, the pipeline stops.

    Database Name of the MongoDB database.
    Collection Name of the MongoDB collection to use.
    Initial Offset Initial offset to use to begin reading. When using a date or Object ID field as the offset field, enter a timestamp with the following format: YYYY-MM-DD hh:mm:ss.

    When using a string field, enter the string to use.

    Default is: 2015-01-01 00:00:00.

    Offset Field Type Data type of the offset field:
    • ObjectId - Use for an Object ID field.
    • Date - Use for a date field.
    • String - Use for a string field.

    Default is ObjectId.

    Offset Field Field to use to track reads. Default is the _id field.

    You can use a nested offset field, such as o._id. You can also use any Object ID, date, or string field. Results are not guaranteed for anything but the _id field.

    Capped Collection The collection is capped. Clear this option to read an uncapped collection.
    Batch Size (records) Maximum number of records allowed in a batch.
    Max Batch Wait Time Amount of time the origin will wait to fill a batch before sending an empty batch.
    Read Preference Determines how the origin reads data from different members of the MongoDB replica set.
  3. To enter credentials separately from the MongoDB connection string, click the Credentials tab and configure the following properties:
    Credentials Description
    Authentication Type Authentication used by the MongoDB server: Username/Password or LDAP.
    Authentication Mechanism LDAP authentication method: server-driven or plain authentication.
    Username MongoDB or LDAP user name.
    Password MongoDB or LDAP password.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Authentication Source An optional alternate database name to perform delegated authentication.

    Available for the Username/Password option.

  4. Optionally, click the Advanced tab to configure how the origin connects to MongoDB.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Connections Per Host Maximum number of connections for each host.

    Default is 100.

    Min Connections Per Host Minimum number of connections for each host.

    Default is 0.

    Connection Timeout Maximum time in milliseconds to wait for a connection.

    Default is 10,000.

    Max Connection Idle Time Maximum time in milliseconds that a pooled connection can remain idle. When a pooled connection exceeds the idle time, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Connection Lifetime Maximum time in milliseconds that a pooled connection can be active. When a pooled connection exceeds the lifetime, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Wait Time Maximum time in milliseconds that a thread can wait for a connection to become available. Use 0 to opt out of this property. Use a negative value to wait indefinitely.

    Default is 120,000.

    Server Selection Timeout Maximum time in milliseconds that Data Collector waits for a server selection before throwing an exception. If you use 0, an exception is thrown immediately if no server is available. Use a negative value to wait indefinitely.

    Default is 30,000.

    Threads Allowed to Block for Connection Multiplier Multiplier that determines the maximum number of threads that can wait for a connection to become available from the pool. This number multiplied by the Connections Per Host value determines the maximum number of threads.

    Default is 5.

    Heartbeat Frequency The frequency in milliseconds at which Data Collector attempts to determine the current state of each server in the cluster.

    Default is 10,000.

    Min Heartbeat Frequency Minimum heartbeat frequency in milliseconds. Data Collector waits at least this long before checking the state of each server.

    Default is 500.

    Heartbeat Connection Timeout Maximum time in milliseconds to wait for a connection used for the cluster heartbeat.

    Default is 20,000.

    Heartbeat Socket Timeout Maximum time in milliseconds for a socket timeout for connections used for the cluster heartbeat.

    Default is 20,000.

    Local Threshold Local threshold in milliseconds. Requests are sent to a server whose ping time is less than or equal to the server with the fastest ping time plus the local threshold value.

    Default is 15.

    Required Replica Set Name Required replica set name to use for the cluster.
    Cursor Finalizer Enabled Specifies whether to enable cursor finalizers.
    Socket Keep Alive Specifies whether to enable socket keep alive.
    Socket Timeout Maximum time in milliseconds for the socket timeout. Use 0 to opt out of this property.

    Default is 0.

    SSL Enabled Enables SSL/TLS.

    If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, you also must define the truststore file and password in Java configuration options in the deployment, as described in Enabling SSL/TLS.

    SSL Invalid Host Name Allowed Specifies whether invalid host names are allowed in SSL/TLS certificates.