MongoDB

The MongoDB destination writes data to MongoDB. For information about supported versions, see Supported Systems and Versions.

To write data, the MongoDB destination requires records to include a CRUD operation record header attribute. The CRUD operation header attribute indicates the operation to perform for each record. You can also enable upserts for Update and Replace records. For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.

When you configure the MongoDB destination, you define connection information, such as the connection string and MongoDB credentials. You can also use a connection to configure the destination. You configure the database, and collection, and write concern to use.

To replace and update records, you must specify a unique key field and can optionally enable an upsert flag. When you do not specify a unique key field, replace and update records are sent to the stage for error handling.

You can optionally configure advanced options that determine how the destination connects to MongoDB, including enabling SSL/TLS for the destination.

Credentials

Based on the authentication used by the MongoDB server, configure the stage to use no authentication, username/password authentication, or LDAP authentication. When using username/password authentication, you can also use delegated authentication. When using LDAP authentication, you can use server-driven authentication or plain authentication.

By default, the destination uses no authentication.

To use username/password or LDAP authentication, enter the required credentials in one of the following ways:
Connection string
Enter credentials in the connection string on the MongoDB tab.
To enter credentials for username/password authentication, enter the username and password before the host name. Use the following format:
mongodb://username:password@host[:port][/[database][?options]]
To enter credentials for LDAP authentication, enter the username and password before the host name, and set the authMechanism option to PLAIN. Use the following format:
mongodb://username:password@host[:port][/[database]?authMechanism=PLAIN
Credentials tab
Select either the Username/Password or LDAP authentication type on the Credentials tab. When using LDAP authentication, you also choose between server-driven or plain authentication.
Then, you specify the username and password for the authentication type.

Define the CRUD Operation

To write to MongoDB, ensure that the CRUD operation record header attribute is defined for each record earlier in the pipeline. Records without an operation record header attribute are sent to error.

To update and replace records, you must specify a unique key field. You can also enable upserts for update and replace records.

Note that when performing a DELETE operation, the destination deletes a maximum of one matching document in MongoDB. It does not delete all matching documents, as is sometimes possible with MongoDB.

To write records to MongoDB, make sure records include the following CRUD operation record header attribute:
sdc.operation.type
The MongoDB destination uses the CRUD operation in the sdc.operation.type record header attribute when writing to MongoDB. The MongoDB destination supports the following values for the sdc.operation.type attribute:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 7 for REPLACE
If your pipeline has a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline has a non-CDC origin, you can use the Expression Evaluator processor or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.

Performing Upserts

You can configure the destination to perform upserts. When you enable upserts, the destination inserts records for Update and Replace records when it does not find an existing record to update or replace.

By default, the destination does not perform upserts. If the destination does not find an existing record for a record flagged for update or replace, it does not write the record to MongoDB.

For more information about MongoDB operations and the upsert flag, see the MongoDB documentation.

Enabling SSL/TLS

You can enable the MongoDB destination to use SSL/TLS to connect to MongoDB.

  1. On the Advanced tab for the stage, select the SSL Enabled property.
  2. If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, create a custom truststore file or modify a copy of the default Java truststore file to add the CA to the file. Then configure Data Collector to use the modified truststore file.

    By default, Data Collector uses the Java truststore file located in $JAVA_HOME/jre/lib/security/cacerts. If your certificate is signed by a CA that is included in the default Java truststore file, you do not need to create a truststore file and can skip this step.

    In these steps, we show how to modify the default truststore file to add an additional CA to the list of trusted CAs. If you prefer to create a custom truststore file, see the keytool documentation.

    1. Use the following command to set the JAVA_HOME environment variable:
      export JAVA_HOME=<Java home directory>
    2. Use the following command to set the SDC_CONF environment variable:
      export SDC_CONF=<Data Collector configuration directory>
      For example:
      export SDC_CONF=/streamsets-datacollector-5.6.0/etc
    3. Use the following command to copy the default Java truststore file to the Data Collector configuration directory:
      cp "${JAVA_HOME}/jre/lib/security/cacerts" "${SDC_CONF}/truststore.jks"
    4. Use the following keytool command to import the CA certificate into the truststore file:
      keytool -import -file <certificate> -trustcacerts -noprompt -alias <alias> -storepass <password> -keystore "${SDC_CONF}/truststore.jks"
    5. In Control Hub, edit the deployment. In the Configure Engine section, click Advanced Configuration. Then, click Java Configuration. Define the following options in the Java Options property:
      • javax.net.ssl.trustStore - Path to the truststore file on the Data Collector machine.
      • javax.net.ssl.trustStorePassword - Truststore password.
      For example, define the options as follows:
      -Djavax.net.ssl.trustStore=/streamsets-datacollector-5.6.0/etc/truststore.jks -Djavax.net.ssl.trustStorePassword=mypassword
    6. Save the changes to the deployment and restart all engine instances.
For more information about environment variables, see Java and Security Configuration.

Configuring a MongoDB Destination

Configure a MongoDB destination to write to MongoDB.
Important: Ensure that all records routed to the destination include a CDC record header attribute. For more information, see Define the CRUD Operation.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error
    Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the MongoDB tab, configure the following properties:
    MongoDB Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Connection String
    Connection string for the MongoDB instance. Use the following format:
    mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
    When connecting to a cluster, enter additional node information to ensure a connection.

    If the MongoDB server uses username/password or LDAP authentication, you can include the credentials in the connection string, as described in Credentials.

    Database MongoDB database name.
    Collection MongoDB collection name.
    Unique Key Field Field in the record to be used to update and replace records. When not set, records flagged for update or replace are sent to stage for error handling.
    Upsert Inserts records flagged for update or replace when the record does not exist in the database.
    Write Concern The acknowledgement level requested from the destination system.

    For details about write concern levels, see the MongoDB documentation.

  3. To enter credentials separately from the MongoDB connection string, click the Credentials tab and configure the following properties:
    Credentials Description
    Authentication Type Authentication used by the MongoDB server: Username/Password or LDAP.
    Authentication Mechanism LDAP authentication method: server-driven or plain authentication.
    Username MongoDB or LDAP user name.
    Password MongoDB or LDAP password.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Authentication Source An optional alternate database name to perform delegated authentication.

    Available for the Username/Password option.

  4. Optionally, click the Advanced tab to configure how the destination connects to MongoDB.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Connections Per Host Maximum number of connections for each host.

    Default is 100.

    Min Connections Per Host Minimum number of connections for each host.

    Default is 0.

    Connection Timeout Maximum time in milliseconds to wait for a connection.

    Default is 10,000.

    Max Connection Idle Time Maximum time in milliseconds that a pooled connection can remain idle. When a pooled connection exceeds the idle time, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Connection Lifetime Maximum time in milliseconds that a pooled connection can be active. When a pooled connection exceeds the lifetime, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Wait Time Maximum time in milliseconds that a thread can wait for a connection to become available. Use 0 to opt out of this property. Use a negative value to wait indefinitely.

    Default is 120,000.

    Server Selection Timeout Maximum time in milliseconds that Data Collector waits for a server selection before throwing an exception. If you use 0, an exception is thrown immediately if no server is available. Use a negative value to wait indefinitely.

    Default is 30,000.

    Threads Allowed to Block for Connection Multiplier Multiplier that determines the maximum number of threads that can wait for a connection to become available from the pool. This number multiplied by the Connections Per Host value determines the maximum number of threads.

    Default is 5.

    Heartbeat Frequency The frequency in milliseconds at which Data Collector attempts to determine the current state of each server in the cluster.

    Default is 10,000.

    Min Heartbeat Frequency Minimum heartbeat frequency in milliseconds. Data Collector waits at least this long before checking the state of each server.

    Default is 500.

    Heartbeat Connection Timeout Maximum time in milliseconds to wait for a connection used for the cluster heartbeat.

    Default is 20,000.

    Heartbeat Socket Timeout Maximum time in milliseconds for a socket timeout for connections used for the cluster heartbeat.

    Default is 20,000.

    Local Threshold Local threshold in milliseconds. Requests are sent to a server whose ping time is less than or equal to the server with the fastest ping time plus the local threshold value.

    Default is 15.

    Required Replica Set Name Required replica set name to use for the cluster.
    Cursor Finalizer Enabled Specifies whether to enable cursor finalizers.
    Socket Keep Alive Specifies whether to enable socket keep alive.
    Socket Timeout Maximum time in milliseconds for the socket timeout. Use 0 to opt out of this property.

    Default is 0.

    SSL Enabled Enables SSL/TLS.

    If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, you also must define the truststore file and password in Java configuration options in the deployment, as described in Enabling SSL/TLS.

    SSL Invalid Host Name Allowed Specifies whether invalid host names are allowed in SSL/TLS certificates.