Salesforce Bulk API 2.0 Lookup

Supported pipeline types:
  • Data Collector

The Salesforce Bulk API 2.0 Lookup processor performs lookups on a Salesforce object using Salesforce Bulk API 2.0. To perform lookups with the SOAP or Bulk API, use the Salesforce Lookup processor. For information about supported versions, see Supported Systems and Versions.

Use the Salesforce Bulk API 2.0 Lookup processor to enrich records with additional data. For example, you can configure the processor to use an account_number field as the field to look up account name values in a Salesforce object, and pass the values to a new account_name output field.

When a lookup results in multiple matches, the Salesforce Bulk API 2.0 Lookup processor can return the first matching value or return all matching values in separate records.

When you configure the Salesforce Bulk API 2.0 Lookup processor, you define connection information that the processor uses to connect to Salesforce, including the Salesforce API version and the authentication to use.

You specify a SOQL query, the output fields for returned values, and the multiple match behavior. You can optionally define a default value to use for fields with missing values and set the behavior for fields with missing values and no default value.

You can configure the maximum number of columns that the lookup query can return and the maximum number of seconds that the processor waits for a response from the query.

You can configure the processor to retrieve deleted records from the Salesforce recycle bin. You can also configure the processor to locally cache the lookup values to improve performance.

You can optionally use an HTTP proxy to connect to Salesforce. When enabled in Salesforce, you can also configure the processor to use mutual authentication.

The processor generates Salesforce field attributes that provide additional information about each field.

Lookup Cache

To improve pipeline performance, you can configure the Salesforce Bulk API 2.0 Lookup processor to locally cache the values returned from a Salesforce object.

The processor caches values until the cache reaches the maximum size or the expiration time. When the first limit is reached, the processor evicts values from the cache.

You can configure the following ways to evict values from the cache:
Size-based eviction
Configure the maximum number of values that the processor caches. When the maximum number is reached, the processor evicts the oldest values from the cache.
Time-based eviction
Configure the amount of time that a value can remain in the cache without being written to or accessed. When the expiration time is reached, the processor evicts the value from the cache. The eviction policy determines whether the processor measures the expiration time since the last write of the value or since the last access of the value.
For example, you set the eviction policy to expire after the last access and set the expiration time to 60 seconds. After the processor does not access a value for 60 seconds, the processor evicts the value from the cache.

When you stop the pipeline, the processor clears the cache.

Salesforce Attributes

The Salesforce Bulk API 2.0 Lookup processor generates Salesforce field attributes that provide additional information about each field. The processor receives these details from Salesforce.

Salesforce attributes include a user-defined prefix to differentiate the Salesforce attributes from other attributes. The prefix is salesforce. by default. You can change the prefix that the processor uses and you can configure the processor not to create Salesforce attributes.

You can use the record:fieldAttribute or record:fieldAttributeOrDefault functions to access the information in the attribute.

The Salesforce Bulk API 2.0 Lookup processor can provide the following Salesforce field attributes:
Salesforce Field Attribute Description
<Salesforce prefix>salesforceType Provides the original Salesforce data type for the field.
<Salesforce prefix>length Provides the original length for all string and textarea fields.
<Salesforce prefix>precision Provides the original precision for all double fields.
<Salesforce prefix>scale Provides the original scale for all double fields.
<Salesforce prefix>digits Provides the maximum number of digits for all integer fields.

For more information about field attributes, see Field Attributes.

Changing the API Version

Data Collector ships with version 57.0.0 of the Salesforce Web Services Connector libraries. You can use a different Salesforce API version if you need to access functionality not present in version 57.0.0.

  1. On the Salesforce tab, set the API Version property to the version that you want to use.
  2. Download the relevant version of the following JAR files from Salesforce Web Services Connector (WSC):
    • WSC JAR file - force-wsc-<version>.0.0.jar

    • Partner API JAR file - force-partner-api-<version>.0.0.jar

    Where <version> is the API version number.

    For information about downloading libraries from Salesforce WSC, see the Salesforce Developer documentation.

  3. In the following Data Collector directory, replace the default force-wsc-57.0.0.jar and force-partner-api-57.0.0.jar files with the versioned JAR files that you downloaded:
    $SDC_DIST/streamsets-libs/streamsets-datacollector-salesforce-lib/lib/
  4. Restart Data Collector for the changes to take effect.

Configuring a Salesforce Bulk API 2.0 Lookup Processor

Configure a Salesforce Lookup processor to perform lookups in a Salesforce object using Salesforce Bulk API 2.0.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the Salesforce tab, configure the following properties:
    Salesforce Property Description
    Auth Endpoint Salesforce SOAP API authentication endpoint. For example, you might enter one of the following common values:
    • login.salesforce.com - Use to connect to a Production or Developer Edition organization.
    • test.salesforce.com - Use to connect to a sandbox organization.

    Default is login.salesforce.com.

    API Version Salesforce API version used to connect to Salesforce.

    Default is 57.0.0. If you change the version, you also must download the relevant JAR files from Salesforce Web Services Connector (WSC).

    Authentication Type Authentication type to use to connect to Salesforce:
    • Basic Authentication - Specify a user name and password.
    • Connected App with OAuth - Use an OAuth 2.0-enabled connected app to enable machine-to-machine OAuth with JWT Bearer Flow.
    Username Salesforce username in the following email format: <text>@<text>.com.

    When using Connected App with OAuth authentication, the user must be authorized to use the app.

    Password

    Salesforce password.

    If the Data Collector machine is outside the trusted IP range configured in your Salesforce environment, you must use a security token along with the password. Use Salesforce to generate a security token and then set this property to the password followed by the security token.

    For example, if the password is abcd and the security token is 1234, then set this property to abcd1234. For more information on generating a security token, see Reset Your Security Token.

    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Consumer Key Consumer key from the connected app.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.

    Available when using Connected App with OAuth authentication.

    Private Key Private key from the public key certificate that you used with the connected app. Ensure that the key is formatted correctly, with no spaces or extra line breaks.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.

    Available when using Connected App with OAuth authentication.

  3. On the Lookup tab, configure the following properties:
    Lookup Property Description
    SOQL Query SOQL query to use to look up data in Salesforce. Use the following syntax for the query:
    SELECT <field1 name>, <field2 name> FROM <object name> WHERE <field3 name> <operator> <expression>
    Include Deleted Records Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin.
    Field Mappings Use to override the default field mappings. By default, Salesforce fields are written to Data Collector fields of the same name.
    Enter the following:
    • Salesforce Field - Name of the Salesforce field that contains the lookup value. Enter a field name or enter an expression that defines the field name.
    • SDC Field - Name of the field in the record that receives the lookup value. You can specify an existing field or a new field. If the field does not exist, the processor creates the field.
    • Default Value - Optional default value to use when the query does not return a value for the field. If the query returns no value and this property is not defined, the processor handles the record based on the Missing Values Behavior property.
    • Data Type - Data type to use for SDC Field. Required when you specify a default value. The processor uses the Salesforce data type by default.

    Using simple or bulk edit mode, click the Add icon to create additional field mappings.

    Multiple Values Behavior Action to take upon finding multiple matching values:
    • First value only - Returns the first value.
    • Split into multiple records - Returns every matching value in a separate record.
    Maximum Query Columns Maximum number of columns that the query can retrieve. If the queried object has more columns, the query fails.

    Default is 512.

    Salesforce Query Timeout Maximum number of seconds to wait for a response from Salesforce.

    Default is 60.

    Missing Values Behavior Action to take upon finding no return values in fields with no default value defined:
    • Send to error - Sends the record to error.
    • Pass the record along the pipeline unchanged - Passes the record without a lookup return value.
    Poll Interval Interval to wait before polling for the Salesforce job status, in milliseconds.
    Enable Local Caching Specifies whether to locally cache the returned values.
    Maximum Entries to Cache Maximum number of values to cache. When the maximum number is reached, the processor evicts the oldest values from the cache.

    Default is -1, which means unlimited.

    Eviction Policy Type Policy used to evict values from the local cache when the expiration time has passed:
    • Expire After Last Access - Measures the expiration time since the value was last accessed by a read or a write.
    • Expire After Last Write - Measures the expiration time since the value was created, or since the value was last replaced.
    Expiration Time Amount of time that a value can remain in the local cache without being accessed or written to.

    Default is 1 second.

    Time Unit Unit of time for the expiration time.

    Default is seconds.

  4. On the Advanced tab, configure the following properties:
    Advanced Property Description
    Use Proxy Specifies whether to use an HTTP proxy to connect to Salesforce.
    Proxy Hostname Proxy host.
    Proxy Port Proxy port.
    Proxy Requires Credentials Specifies whether the proxy requires a user name and password.
    Proxy Realm Authentication realm for the proxy server.
    Proxy Username User name for proxy credentials.
    Proxy Password Password for proxy credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Create Salesforce Attributes Adds Salesforce field attributes to fields. The processor creates Salesforce attributes by default.
    Salesforce Attribute Prefix Prefix for Salesforce attributes.
    Use Mutual Authentication

    When enabled in Salesforce, you can use SSL/TLS mutual authentication to connect to Salesforce.

    Mutual authentication is not enabled in Salesforce by default. To enable mutual authentication, contact Salesforce.

    Before enabling mutual authentication, you must store a mutual authentication certificate in the Data Collector resources directory. For more information, see Keystore and Truststore Configuration.

    Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties.
    Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key.
    Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    Keystore File

    Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    By default, no keystore is used.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password

    Password to the keystore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Keystore Key Algorithm

    Algorithm to manage the keystore.

    Default is SunX509.