Salesforce Bulk API 2.0

The Salesforce Bulk API 2.0 origin reads existing data from Salesforce using Salesforce Bulk API 2.0. To read from Salesforce with the SOAP or Bulk API, or to subscribe to notifications, use the Salesforce origin. For information about supported versions, see Supported Systems and Versions.

When you configure the Salesforce Bulk API 2.0 origin, you specify the authentication to use. You can also use a connection to configure the origin.

When processing existing data, you configure the SOQL query, offset field, and optional initial offset to use. The origin can perform a full or incremental read at specified intervals. And under certain circumstances, the origin can also process deleted records.

The Salesforce Bulk API 2.0 origin can use multiple threads to process query result sets in parallel.

By default, the origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field.

You can specify the prefix to use for Salesforce attributes, or you can disable attribute generation entirely. You can also configure other advanced options, such as disabling query validation or using mutual authentication and an HTTP proxy for the connection.

The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Querying Data

The Salesforce Bulk API 2.0 origin executes a query to read existing data from Salesforce. Use the Salesforce Object Query Language (SOQL) to write the query.

The Salesforce Bulk API 2.0 origin uses an offset field and an initial offset or start ID to determine where to start reading data within an object. By default, the offset field is defined as the Salesforce Id system field, which contains a unique identifier for each record in a Salesforce object.

You can configure the maximum number of columns that the query can return and the maximum number of seconds that the origin waits for a response from the query.

Salesforce Bulk API 2.0 is an asynchronous API. The origin creates a query job, and periodically polls Salesforce until the job is complete. Salesforce returns query results in one or more result sets. Result sets can be processed in parallel by enabling multithreaded processing.

If the pipeline stops before it finishes reading all data, the Salesforce Bulk API 2.0 origin saves the last read offset value. When the pipeline starts again, the origin uses the last read offset value to continue processing from where it stopped. You can reset the origin to process all requested objects.

Unlike the Salesforce origin, the Salesforce Bulk API 2.0 origin does not stop the pipeline when an initial query is complete. To stop the pipeline automatically, enable the origin to generate events and use the Pipeline Finisher executor. For more information, see Event Generation.

In rare cases, the query returns data with a type that does not match the data type specified in the schema for a field. For example, the query might return a Float when the schema specifies an Integer. You can use the Mismatched Types Behavior property on the Advanced tab to configure how the origin handles mismatched types. The origin can retain the returned data, truncate the returned data to match the specified type, or round the returned data to match the specified type.

Bulk API 2.0 Queries

When querying existing data with version 2.0 of the Bulk API, you define the SOQL query and related properties to determine the data returned from Salesforce.

Use the following guidelines:
SOQL query
When processing existing data, use the following query guidelines:
  • In the WHERE clause, include the offset field and the offset value. The origin uses an offset field and value to determine the data that is returned. Include both in the WHERE clause of the query.
  • In the WHERE clause, use the OFFSET constant to represent the offset value.
    Use ${OFFSET} to represent the offset value. For example, when you start a pipeline, the following query returns all data from the object where the data in the offset field is greater than the initial offset value:
    SELECT Id, Name FROM <object> WHERE <offset field> > ${OFFSET}
    Note: When the offset values are strings, enclose ${OFFSET} in single quotation marks.
  • To avoid returning duplicate data, use the offset field as the first field in the ORDER BY clause.
    Note: Using a field that is not the Id field in the ORDER BY clause can slow performance.
Note that Bulk API 2.0 does not support queries with any of the following:
  • GROUP BY, OFFSET, or TYPEOF clauses
  • Aggregate functions such as COUNT()
  • Date functions in GROUP BY clauses (date functions in WHERE clauses are supported)
  • Compound address fields or compound geolocation fields, although their component fields are supported in queries
The complete SOQL query should use the following syntax:
SELECT <offset field>, <field1>, <field2>, ... FROM <object> WHERE <offset field> > ${OFFSET} ORDER BY <offset field>

If you specify SELECT * FROM <object> in the SOQL query, the origin expands * to all fields in the Salesforce object that are accessible to the configured user. Note that the origin adds components of compound fields to the query, rather than adding the compound fields themselves. For example, the origin adds BillingStreet, BillingCity, etc., rather than adding BillingAddress. Similarly, it adds Location__Latitude__s and Location__Longitude__s rather than Location__c.

When necessary, you can configure the origin to skip validating the query. Skip query validation when you know that the query is valid but it does not match validation requirements. For example, you must disable query validation if you omit the ORDER BY clause. You might omit the ORDER BY clause to improve performance on a large query. To disable query validation, use the Disable Query Validation property on the Advanced tab.

Additional properties
You can configure several additional properties on the Query tab. For example:
  • Offset Field - Typically the Id system field, the offset field should be an indexed field in the record. Default is the Id field.
  • Initial Offset - First offset value to use when the pipeline starts or after you reset the origin.
  • Include Deleted Records - An optional property. Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin.

Example

Let's say that you want to read all names and account numbers from the Salesforce Account object a single time.

To process the data, you configure the following properties on the Query tab:
  • SOQL Query - Include the offset field and offset value in the WHERE and ORDER BY clauses, as well as the fields to return, as follows:
    SELECT Id, Name, AccountNumber FROM Account WHERE Id > '${OFFSET}' ORDER BY Id
  • Repeat Query - Set to No Repeat to run the query a single time.

  • Initial Offset - Use the default value of fifteen zeros (000000000000000) for the offset value to ensure that the origin reads all records in the object.

  • Offset Field - Use the default, Id, for the offset field.

Full and Incremental Mode

The Salesforce Bulk API 2.0 origin can perform queries in two modes:
Incremental mode
When the origin performs an incremental query, it uses the initial offset as the offset value in the first SOQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
When the origin performs a subsequent query, it returns data based on the last-saved offset. You can reset the origin to use the initial offset value.
Use incremental mode for append-only objects or when you do not need to capture changes to older records. By default, the origin uses incremental mode.
Full mode
When the origin performs a full query, it runs the specified SOQL query. The origin uses the initial offset as the offset value in the SQL query each time it requests data.
When the origin completes processing the results of the full query, it waits the specified query interval, and then performs the same query again. Use full mode to capture all record updates.
Tip: If you want to process the results from a single full query and then stop the pipeline, you can enable the origin to generate events and use the Pipeline Finisher to stop the pipeline automatically. For more information, see Event Generation.

Multithreaded Processing

The Salesforce Bulk API 2.0 origin can perform parallel processing and enables the creation of a multithreaded pipeline.

When you enable multithreaded processing, the Salesforce Bulk API 2.0 origin uses multiple concurrent threads based on the Number of Threads property. When you start the pipeline, the origin creates the number of threads specified in the property.

Salesforce Bulk API 2.0 query results are returned in one or more result sets, each of which can be processed in parallel up to the Number of Threads configuration setting that you specify. You can control the maximum size of a result set with the Maximum Records per Query Result Set advanced stage property. If you are working with a very large number of query results, you might experience a timeout before receiving all of the data from Salesforce. To prevent a timeout, you can use this property to split the results into smaller sets.

When you don’t provide a value for the Maximum Records per Query Result Set property, the server uses a default value based on the service.

A Salesforce Bulk API 2.0 origin thread reads each result set into one or more batches of records. Upon filling each batch, the origin passes it to an available pipeline runner.

A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.

For example, say you enable multithreaded processing and set the Number of Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.

Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read order.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Processing Deleted Records

The Salesforce Bulk API 2.0 origin can retrieve deleted records from the Salesforce recycle bin for processing.

To process deleted records, use the Include Deleted Records property on the Query tab.

Reading Custom Objects or Fields

If the origin reads custom Salesforce objects or fields, you might want to use a Field Renamer in the pipeline to rename the custom fields.

When you extend Salesforce objects, custom object and field names are appended with the suffix __c. For example, if you create a custom Transaction object, Salesforce names the object Transaction__c. The Transaction object might contain fields named Credit_Card__c, Fare_Amount__c, and Payment_Type__c.

Instead of using field names appended with the suffix __c throughout the rest of the pipeline, you can add a Field Renamer to remove the suffix from the field names.

For more information about Salesforce custom objects, see the Salesforce documentation.

Salesforce Attributes

The Salesforce Bulk API 2.0 origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field. The origin receives these details from Salesforce.

Salesforce attributes include a user-defined prefix to differentiate the Salesforce attributes from other attributes. The prefix is salesforce. by default. You can change the prefix that the origin uses and you can configure the origin not to create Salesforce attributes.

Salesforce Header Attribute

The Salesforce Bulk API 2.0 origin generates a Salesforce record header attribute to provide additional information about each record. The origin receives these details from Salesforce.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attribute.

The Salesforce Bulk API 2.0 origin can provide the following Salesforce header attribute:
Salesforce Header Attribute Description
<Salesforce prefix>sobjectType Provides the Salesforce source object for the record.

Generated when the origin executes a query.

For more information about record header attributes, see Record Header Attributes.

Salesforce Field Attributes

The Salesforce Bulk API 2.0 origin generates Salesforce field attributes that provide additional information about each field, such as the data type of the Salesforce field. The origin receives these details from Salesforce.

You can use the record:fieldAttribute or record:fieldAttributeOrDefault functions to access the information in the attribute.

The Salesforce Bulk API 2.0 origin can provide the following Salesforce field attributes:
Salesforce Field Attribute Description
<Salesforce prefix>salesforceType Provides the original Salesforce data type for the field.
<Salesforce prefix>length Provides the original length for all string and textarea fields.
<Salesforce prefix>precision Provides the original precision for all double fields.
<Salesforce prefix>scale Provides the original scale for all double fields.
<Salesforce prefix>digits Provides the maximum number of digits for all integer fields.

For more information about field attributes, see Field Attributes.

Event Generation

The Salesforce Bulk API 2.0 origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query.

Salesforce events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to repeat an incremental query, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. If you configure the origin to repeat a full query, when you restart the pipeline, the origin uses the initial offset.

    For an example, see Stopping a Pipeline After Processing All Available Data.

  • With the Email executor to send a custom email after receiving an event.

    For an example, see Sending Email During Pipeline Processing.

Event Record

Event records generated by the Salesforce Bulk API 2.0 origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses the following type:
  • no-more-data - Generated when the origin completes processing all data returned by a query.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.

The no-more-data event record includes no record fields.

Changing the API Version

The Salesforce Bulk API 2.0 origin uses version 57.0.0 of the Salesforce API, by default. You can use a different Salesforce API version if you need to access functionality not present in version 57.0.0.

  1. On the Salesforce tab, set the API Version property to the version that you want to use.
  2. Download the relevant version of the following JAR files from Salesforce Web Services Connector (WSC):
    • WSC JAR file - force-wsc-<version>.0.0.jar

    • Partner API JAR file - force-partner-api-<version>.0.0.jar

    Where <version> is the API version number.

    For information about downloading libraries from Salesforce WSC, see the Salesforce Developer documentation.

  3. In the following Data Collector directory, replace the default force-wsc-57.0.0.jar and force-partner-api-57.0.0.jar files with the versioned JAR files that you downloaded:
    $SDC_DIST/streamsets-libs/streamsets-datacollector-salesforce-lib/lib/
  4. Restart Data Collector for the changes to take effect.

Configuring a Salesforce Bulk API 2.0 Origin

Configure a Salesforce Bulk API 2.0 origin to read data from Salesforce using Salesforce Bulk API 2.0.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Salesforce tab, configure the following properties:
    Salesforce Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Auth Endpoint Salesforce SOAP API authentication endpoint. For example, you might enter one of the following common values:
    • login.salesforce.com - Use to connect to a Production or Developer Edition organization.
    • test.salesforce.com - Use to connect to a sandbox organization.

    Default is login.salesforce.com.

    API Version Salesforce API version used to connect to Salesforce.

    Default is 57.0.0. If you change the version, you also must download the relevant JAR files from Salesforce Web Services Connector (WSC).

    Authentication Type Authentication type to use to connect to Salesforce:
    • Basic Authentication - Specify a user name and password.
    • Connected App with OAuth - Use an OAuth 2.0-enabled connected app to enable machine-to-machine OAuth with JWT Bearer Flow.
    Username Salesforce username in the following email format: <text>@<text>.com.

    When using Connected App with OAuth authentication, the user must be authorized to use the app.

    Password

    Salesforce password.

    If the Data Collector machine is outside the trusted IP range configured in your Salesforce environment, you must use a security token along with the password. Use Salesforce to generate a security token and then set this property to the password followed by the security token.

    For example, if the password is abcd and the security token is 1234, then set this property to abcd1234. For more information on generating a security token, see Reset Your Security Token.

    Available when using Basic Authentication.

    Consumer Key Consumer key from the connected app.

    Available when using Connected App with OAuth authentication.

    Private Key Private key from the public key certificate that you used with the connected app.

    Ensure that the key is formatted correctly, with no spaces or extra line breaks.

    Available when using Connected App with OAuth authentication.

    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Max Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch.
  3. To query data, on the Query tab, configure the following properties:
    Query Property Description
    SOQL Query SOQL query to use when reading existing data from Salesforce.
    Include Deleted Records Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin.
    Incremental Mode Defines how the origin queries the database. Select to perform incremental queries. Clear to perform full queries.

    Default is incremental mode.

    Offset Field Typically the Id system field, the offset field should be an indexed field in the record.

    Default is the Id field.

    Number of Threads Number of threads the origin generates and uses for multithreaded processing. Default is 1.
    Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Default is 1 minute: ${1 * MINUTES}.

    Initial Offset First offset value to use when the pipeline starts or after you reset the origin.

    Default is fifteen zeros: 000000000000000.

    Maximum Query Columns Maximum number of columns that the query can retrieve. If the queried object has more columns, the query fails.

    Default is 512.

    Salesforce Query Timeout Maximum number of seconds to wait for a response from Salesforce.

    Default is 60.

    Poll Interval Interval to wait before polling for the Salesforce job status, in milliseconds.
  4. On the Advanced tab, configure the following properties:
    Advanced Property Description
    Use Proxy Specifies whether to use an HTTP proxy to connect to Salesforce.
    Proxy Hostname Proxy host.
    Proxy Port Proxy port.
    Proxy Requires Credentials Specifies whether the proxy requires a user name and password.
    Proxy Realm Authentication realm for the proxy server.
    Proxy Username User name for proxy credentials.
    Proxy Password Password for proxy credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Create Salesforce Attributes Adds Salesforce header attributes to records and field attributes to fields. The origin creates Salesforce attributes by default.
    Salesforce Attribute Prefix Prefix for Salesforce attributes.
    Disable Query Validation Disables query validation for SOQL queries.
    Maximum Records per Query Result Set Defines the maximum size of a result set, in records. Use to prevent timeouts when working with large result sets.
    Mismatched Types Behavior Action to take on data with a data type that differs from the data type specified in the schema:
    • Preserve the data as returned by Salesforce.
    • Truncate numeric values to match the Salesforce schema.
    • Round numeric values to match the Salesforce schema.
    Use Mutual Authentication

    When enabled in Salesforce, you can use SSL/TLS mutual authentication to connect to Salesforce.

    Mutual authentication is not enabled in Salesforce by default. To enable mutual authentication, contact Salesforce.

    Before enabling mutual authentication, you must store a mutual authentication certificate in the Data Collector resources directory. For more information, see Keystore and Truststore Configuration.

    Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties.
    Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key.
    Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate.
    Keystore File

    Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    By default, no keystore is used.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password Password to the keystore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Keystore Key Algorithm Algorithm to manage the keystore.

    Default is SunX509.