MongoDB Lookup

The MongoDB Lookup processor performs lookups in MongoDB and passes all values from the returned document to a new list-map field in the record. For information about supported versions, see Supported Systems and Versions.

Use the MongoDB Lookup processor to enrich records with additional data. For example, you have multiple department documents in MongoDB that list the employees in the department. You configure the processor to use the department_ID field in the record to look up a department document, and pass all values from the matching document to a new department_employees field in the record.

When you configure the MongoDB Lookup processor, you define connection information, such as the connection string and MongoDB credentials. You can also use a connection to configure the processor. You configure the fields to look up and the field for the return values.

When a lookup results in multiple matched documents, the MongoDB Lookup processor can return values from the first matching document or return values from all matching documents in separate records.

To improve performance, you can configure the processor to locally cache the document values.

You can optionally configure advanced options that determine how the processor connects to MongoDB, including enabling SSL/TLS for the processor.

Field Mappings

When you configure the MongoDB Lookup processor, you define the document fields to look up in MongoDB. You map these document fields to fields in the record that contain the values to look up.

When you define a document field, use the dot notation to define a field in an embedded document as follows:
<embedded document>.<field name>.<embedded field name>
When you define a field in the record, reference the field as follows:
/<field name>

You can define multiple field mappings. The processor uses the configured field mappings to generate and run a find() query in MongoDB.

After defining the field mappings, define a new list-map field to store all values from the returned document.

For example, your MongoDB collection contains customer documents with the following structure:
{
  _id: 123,
  customer: {
       name: "Ed Martinez",
       status: "gold",
	phone: "123-456-7891",
	location: {
	  city: "San Francisco",
	  state: "California"
	}
  }
}

Your pipeline reads from an origin that contains customer names and cities, but you want to enrich that customer data with the customer status and phone number. When you configure the processor, you map the customer.name and customer.location.city document fields to the values stored in the name and city fields in the record. To store the lookup result, you define a new field named customer_details. The following image shows the configured field mappings and the result field:

When you run the pipeline, the processor uses the field mappings to generate and run a find() query in MongoDB. The processor passes all values from the returned document to the new result field.

Lookup Cache

To improve pipeline performance, you can configure the MongoDB Lookup processor to locally cache the document values returned from MongoDB.

The processor caches values until the cache reaches the maximum size or the expiration time. When the first limit is reached, the processor evicts values from the cache.

You can configure the following ways to evict values from the cache:
Size-based eviction
Configure the maximum number of values that the processor caches. When the maximum number is reached, the processor evicts the oldest values from the cache.
Time-based eviction
Configure the amount of time that a value can remain in the cache without being written to or accessed. When the expiration time is reached, the processor evicts the value from the cache. The eviction policy determines whether the processor measures the expiration time since the last write of the value or since the last access of the value.
For example, you set the eviction policy to expire after the last access and set the expiration time to 60 seconds. After the processor does not access a value for 60 seconds, the processor evicts the value from the cache.

When you stop the pipeline, the processor clears the cache.

Credentials

Based on the authentication used by the MongoDB server, configure the stage to use no authentication, username/password authentication, or LDAP authentication. When using username/password authentication, you can also use delegated authentication. When using LDAP authentication, you can use server-driven authentication or plain authentication.

By default, the processor uses no authentication.

To use username/password or LDAP authentication, enter the required credentials in one of the following ways:
Connection string
Enter credentials in the connection string on the MongoDB tab.
To enter credentials for username/password authentication, enter the username and password before the host name. Use the following format:
mongodb://username:password@host[:port][/[database][?options]]
To enter credentials for LDAP authentication, enter the username and password before the host name, and set the authMechanism option to PLAIN. Use the following format:
mongodb://username:password@host[:port][/[database]?authMechanism=PLAIN
Credentials tab
Select either the Username/Password or LDAP authentication type on the Credentials tab. When using LDAP authentication, you also choose between server-driven or plain authentication.
Then, you specify the username and password for the authentication type.

Read Preference

You can configure the read preference that the MongoDB Lookup processor uses.

The read preference determines how the processor reads data from different members of the MongoDB replica set.

You can use the following MongoDB read preferences:
  • Primary - Requires reading from the primary member.
  • Primary Preferred - Prefers reading from the primary, but allows reads from a secondary member.
  • Secondary - Requires reading from a secondary member.
  • Secondary Preferred - Prefers reading from a secondary, but allows reads from a primary when necessary.
  • Nearest - Reads from the member with the least network latency.

BSON Timestamp

When processing data from MongoDB version 2.6 and later, the MongoDB Lookup processor supports the MongoDB BSON Timestamp data type.

The MongoDB BSON Timestamp is a MongoDB data type that includes a timestamp and ordinal as follows:
<BSON Timestamp field name>:Timestamp(<timestamp>, <ordinal>)

The MongoDB processor converts the BSON Timestamp to a map as follows:

<BSON Timestamp field name>{MAP}:
    Timestamp{DATETIME}:<UTC timestamp>
    Ordinal{INTEGER}:<integer ordinal>
For example, a Transaction BSON timestamp of (1485449409, 1), is converted to the following Transaction map field:
"Transaction":{
    "Timestamp":Jan 26, 2016 14:50:09PM
    "Ordinal":1
}

Enabling SSL/TLS

You can enable the MongoDB Lookup processor to use SSL/TLS to connect to MongoDB.

  1. On the Advanced tab for the stage, select the SSL Enabled property.
  2. If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, create a custom truststore file or modify a copy of the default Java truststore file to add the CA to the file. Then configure Data Collector to use the modified truststore file.

    By default, Data Collector uses the Java truststore file located in $JAVA_HOME/jre/lib/security/cacerts. If your certificate is signed by a CA that is included in the default Java truststore file, you do not need to create a truststore file and can skip this step.

    In these steps, we show how to modify the default truststore file to add an additional CA to the list of trusted CAs. If you prefer to create a custom truststore file, see the keytool documentation.

    1. Use the following command to set the JAVA_HOME environment variable:
      export JAVA_HOME=<Java home directory>
    2. Use the following command to set the SDC_CONF environment variable:
      export SDC_CONF=<Data Collector configuration directory>
      For example:
      export SDC_CONF=/streamsets-datacollector-5.6.0/etc
    3. Use the following command to copy the default Java truststore file to the Data Collector configuration directory:
      cp "${JAVA_HOME}/jre/lib/security/cacerts" "${SDC_CONF}/truststore.jks"
    4. Use the following keytool command to import the CA certificate into the truststore file:
      keytool -import -file <certificate> -trustcacerts -noprompt -alias <alias> -storepass <password> -keystore "${SDC_CONF}/truststore.jks"
    5. In Control Hub, edit the deployment. In the Configure Engine section, click Advanced Configuration. Then, click Java Configuration. Define the following options in the Java Options property:
      • javax.net.ssl.trustStore - Path to the truststore file on the Data Collector machine.
      • javax.net.ssl.trustStorePassword - Truststore password.
      For example, define the options as follows:
      -Djavax.net.ssl.trustStore=/streamsets-datacollector-5.6.0/etc/truststore.jks -Djavax.net.ssl.trustStorePassword=mypassword
    6. Save the changes to the deployment and restart all engine instances.

Configuring a MongoDB Lookup Processor

Configure a MongoDB Lookup processor to perform lookups in MongoDB.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the MongoDB tab, configure the following properties:
    MongoDB Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Connection String
    Connection string for the MongoDB instance. Use the following format:
    mongodb://host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
    When connecting to a cluster, enter additional node information to ensure a connection.

    If the MongoDB server uses username/password or LDAP authentication, you can include the credentials in the connection string, as described in Credentials.

    Enable Single Mode Select to connect to a single MongoDB server or node. If multiple nodes are defined in the connection string, the stage connects only to the first node.

    Use this option with care. If the stage cannot connect or the connection fails, the pipeline stops.

    Database Name of the MongoDB database.
    Collection Name of the MongoDB collection to use.
    Read Preference Determines how the processor reads data from different members of the MongoDB replica set.
  3. On the Lookup tab, configure the following properties:
    Lookup Property Description
    Document to SDC Field Mappings List of document fields to look up in MongoDB mapped to fields in the record that contain the lookup values.
    Enter the following:
    • Document Field - Name of the document field to look up. Use the dot notation to define a field in an embedded document as follows:
      <embedded document>.<field1>.<field2>
    • SDC Field - Name of the field in the record that contains the lookup value.

    Using simple or bulk edit mode, click the Add icon to create additional field mappings.

    Result Field Name of the new list-map field in the record that receives all values from the returned document.
    Multiple Values Behavior Action to take upon finding multiple matching documents:
    • First value only - Generates a single record for the return values of the first matching document.
    • Split into Multiple Records - Generates a separate record for the return values of every matching document.
    Missing Values Behavior Action to take upon finding no document to return:
    • Send to error - Sends the record to error.
    • Pass the record along the pipeline unchanged - Passes the record without a lookup return value.
    Enable Local Caching Specifies whether to locally cache the returned values.
    Maximum Entries to Cache Maximum number of values to cache. When the maximum number is reached, the processor evicts the oldest values from the cache.

    Default is -1, which means unlimited.

    Eviction Policy Type Policy used to evict values from the local cache when the expiration time has passed:
    • Expire After Last Access - Measures the expiration time since the value was last accessed by a read or a write.
    • Expire After Last Write - Measures the expiration time since the value was created, or since the value was last replaced.
    Expiration Time Amount of time that a value can remain in the local cache without being accessed or written to.

    Default is 1 second.

    Time Unit Unit of time for the expiration time.

    Default is seconds.

  4. To enter credentials separately from the MongoDB connection string, click the Credentials tab and configure the following properties:
    Credentials Description
    Authentication Type Authentication used by the MongoDB server: Username/Password or LDAP.
    Authentication Mechanism LDAP authentication method: server-driven or plain authentication.
    Username MongoDB or LDAP user name.
    Password MongoDB or LDAP password.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Authentication Source An optional alternate database name to perform delegated authentication.

    Available for the Username/Password option.

  5. Optionally, click the Advanced tab to configure how the processor connects to MongoDB.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Connections Per Host Maximum number of connections for each host.

    Default is 100.

    Min Connections Per Host Minimum number of connections for each host.

    Default is 0.

    Connection Timeout Maximum time in milliseconds to wait for a connection.

    Default is 10,000.

    Max Connection Idle Time Maximum time in milliseconds that a pooled connection can remain idle. When a pooled connection exceeds the idle time, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Connection Lifetime Maximum time in milliseconds that a pooled connection can be active. When a pooled connection exceeds the lifetime, the connection is closed. Use 0 to opt out of this property.

    Default is 0.

    Max Wait Time Maximum time in milliseconds that a thread can wait for a connection to become available. Use 0 to opt out of this property. Use a negative value to wait indefinitely.

    Default is 120,000.

    Server Selection Timeout Maximum time in milliseconds that Data Collector waits for a server selection before throwing an exception. If you use 0, an exception is thrown immediately if no server is available. Use a negative value to wait indefinitely.

    Default is 30,000.

    Threads Allowed to Block for Connection Multiplier Multiplier that determines the maximum number of threads that can wait for a connection to become available from the pool. This number multiplied by the Connections Per Host value determines the maximum number of threads.

    Default is 5.

    Heartbeat Frequency The frequency in milliseconds at which Data Collector attempts to determine the current state of each server in the cluster.

    Default is 10,000.

    Min Heartbeat Frequency Minimum heartbeat frequency in milliseconds. Data Collector waits at least this long before checking the state of each server.

    Default is 500.

    Heartbeat Connection Timeout Maximum time in milliseconds to wait for a connection used for the cluster heartbeat.

    Default is 20,000.

    Heartbeat Socket Timeout Maximum time in milliseconds for a socket timeout for connections used for the cluster heartbeat.

    Default is 20,000.

    Local Threshold Local threshold in milliseconds. Requests are sent to a server whose ping time is less than or equal to the server with the fastest ping time plus the local threshold value.

    Default is 15.

    Required Replica Set Name Required replica set name to use for the cluster.
    Cursor Finalizer Enabled Specifies whether to enable cursor finalizers.
    Socket Keep Alive Specifies whether to enable socket keep alive.
    Socket Timeout Maximum time in milliseconds for the socket timeout. Use 0 to opt out of this property.

    Default is 0.

    SSL Enabled Enables SSL/TLS.

    If the MongoDB certificate is signed by a private CA or not trusted by the default Java truststore, you also must define the truststore file and password in Java configuration options in the deployment, as described in Enabling SSL/TLS.

    SSL Invalid Host Name Allowed Specifies whether invalid host names are allowed in SSL/TLS certificates.