Salesforce Bulk API 2.0
The Salesforce Bulk API 2.0 origin reads existing data from Salesforce using Salesforce Bulk API 2.0. To read from Salesforce with the SOAP or Bulk API, or to subscribe to notifications, use the Salesforce origin. For information about supported versions, see Supported Systems and Versions.
When you configure the Salesforce Bulk API 2.0 origin, you specify the authentication to use. You can also use a connection to configure the origin.
When processing existing data, you configure the SOQL query, offset field, and optional initial offset to use. The origin can perform a full or incremental read at specified intervals. And under certain circumstances, the origin can also process deleted records.
The Salesforce Bulk API 2.0 origin can use multiple threads to process query result sets in parallel.
By default, the origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field.
You can specify the prefix to use for Salesforce attributes, or you can disable attribute generation entirely. You can also configure other advanced options, such as disabling query validation or using mutual authentication and an HTTP proxy for the connection.
The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Querying Data
The Salesforce Bulk API 2.0 origin executes a query to read existing data from Salesforce. Use the Salesforce Object Query Language (SOQL) to write the query.
The Salesforce Bulk API 2.0 origin uses an offset field and an initial offset
or start ID to determine where to start reading data within an object. By default, the
offset field is defined as the Salesforce Id
system field, which
contains a unique identifier for each record in a Salesforce object.
You can configure the maximum number of columns that the query can return and the maximum number of seconds that the origin waits for a response from the query.
Salesforce Bulk API 2.0 is an asynchronous API. The origin creates a query job, and periodically polls Salesforce until the job is complete. Salesforce returns query results in one or more result sets. Result sets can be processed in parallel by enabling multithreaded processing.
If the pipeline stops before it finishes reading all data, the Salesforce Bulk API 2.0 origin saves the last read offset value. When the pipeline starts again, the origin uses the last read offset value to continue processing from where it stopped. You can reset the origin to process all requested objects.
Unlike the Salesforce origin, the Salesforce Bulk API 2.0 origin does not stop the pipeline when an initial query is complete. To stop the pipeline automatically, enable the origin to generate events and use the Pipeline Finisher executor. For more information, see Event Generation.
In rare cases, the query returns data with a type that does not match the data type specified in the schema for a field. For example, the query might return a Float when the schema specifies an Integer. You can use the Mismatched Types Behavior property on the Advanced tab to configure how the origin handles mismatched types. The origin can retain the returned data, truncate the returned data to match the specified type, or round the returned data to match the specified type.
Bulk API 2.0 Queries
When querying existing data with version 2.0 of the Bulk API, you define the SOQL query and related properties to determine the data returned from Salesforce.
- SOQL query
-
When processing existing data, use the following query guidelines:
- In the WHERE clause, include the offset field and the offset value. The origin uses an offset field and value to determine the data that is returned. Include both in the WHERE clause of the query.
- In the WHERE clause, use the OFFSET constant to represent the
offset value.Use
${OFFSET}
to represent the offset value. For example, when you start a pipeline, the following query returns all data from the object where the data in the offset field is greater than the initial offset value:SELECT Id, Name FROM <object> WHERE <offset field> > ${OFFSET}
Note: When the offset values are strings, enclose${OFFSET}
in single quotation marks. - To avoid returning duplicate data, use the offset field as the
first field in the ORDER BY clause.Note: Using a field that is not the
Id
field in the ORDER BY clause can slow performance.
- Additional properties
-
You can configure several additional properties on the Query tab. For example:
- Offset Field - Typically the
Id
system field, the offset field should be an indexed field in the record. Default is theId
field. - Initial Offset - First offset value to use when the pipeline starts or after you reset the origin.
- Include Deleted Records - An optional property. Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin.
- Offset Field - Typically the
Example
Let's say that you want to read all names and account numbers from the Salesforce Account object a single time.
- SOQL Query - Include the offset field and offset value in the WHERE and
ORDER BY clauses, as well as the fields to return, as
follows:
SELECT Id, Name, AccountNumber FROM Account WHERE Id > '${OFFSET}' ORDER BY Id
-
Repeat Query - Set to No Repeat to run the query a single time.
-
Initial Offset - Use the default value of fifteen zeros (
000000000000000
) for the offset value to ensure that the origin reads all records in the object. -
Offset Field - Use the default,
Id
, for the offset field.
Full and Incremental Mode
- Incremental mode
- When the origin performs an incremental query, it uses the initial offset as the offset value in the first SOQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
- Full mode
- When the origin performs a full query, it runs the specified SOQL query. The origin uses the initial offset as the offset value in the SQL query each time it requests data.
Multithreaded Processing
The Salesforce Bulk API 2.0 origin can perform parallel processing and enables the creation of a multithreaded pipeline.
When you enable multithreaded processing, the Salesforce Bulk API 2.0 origin uses multiple concurrent threads based on the Number of Threads property. When you start the pipeline, the origin creates the number of threads specified in the property.
Salesforce Bulk API 2.0 query results are returned in one or more result sets, each of which can be processed in parallel up to the Number of Threads configuration setting that you specify. You can control the maximum size of a result set with the Maximum Records per Query Result Set advanced stage property. If you are working with a very large number of query results, you might experience a timeout before receiving all of the data from Salesforce. To prevent a timeout, you can use this property to split the results into smaller sets.
When you don’t provide a value for the Maximum Records per Query Result Set property, the server uses a default value based on the service.
A Salesforce Bulk API 2.0 origin thread reads each result set into one or more batches of records. Upon filling each batch, the origin passes it to an available pipeline runner.
A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, say you enable multithreaded processing and set the Number of Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read order.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Processing Deleted Records
The Salesforce Bulk API 2.0 origin can retrieve deleted records from the Salesforce recycle bin for processing.
To process deleted records, use the Include Deleted Records property on the Query tab.
Reading Custom Objects or Fields
If the origin reads custom Salesforce objects or fields, you might want to use a Field Renamer in the pipeline to rename the custom fields.
When you extend Salesforce objects, custom object and field names are appended
with the suffix __c
. For example, if you create a custom Transaction
object, Salesforce names the object Transaction__c
. The Transaction
object might contain fields named Credit_Card__c, Fare_Amount__c, and
Payment_Type__c
.
Instead of using field names appended with the suffix __c
throughout the rest of the pipeline, you can add a Field Renamer to remove the suffix
from the field names.
For more information about Salesforce custom objects, see the Salesforce documentation.
Salesforce Attributes
The Salesforce Bulk API 2.0 origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field. The origin receives these details from Salesforce.
Salesforce attributes include a user-defined prefix to differentiate the
Salesforce attributes from other attributes. The prefix is salesforce.
by default. You can change the prefix that the origin uses and you can configure the
origin not to create Salesforce attributes.
Salesforce Header Attribute
The Salesforce Bulk API 2.0 origin generates a Salesforce record header attribute to provide additional information about each record. The origin receives these details from Salesforce.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information in
the attribute.
Salesforce Header Attribute | Description |
---|---|
<Salesforce prefix>sobjectType | Provides the Salesforce source object for the record. Generated when the origin executes a query. |
For more information about record header attributes, see Record Header Attributes.
Salesforce Field Attributes
The Salesforce Bulk API 2.0 origin generates Salesforce field attributes that provide additional information about each field, such as the data type of the Salesforce field. The origin receives these details from Salesforce.
You can use the record:fieldAttribute
or
record:fieldAttributeOrDefault
functions to access the information
in the attribute.
Salesforce Field Attribute | Description |
---|---|
<Salesforce prefix>salesforceType | Provides the original Salesforce data type for the field. |
<Salesforce prefix>length | Provides the original length for all string and textarea fields. |
<Salesforce prefix>precision | Provides the original precision for all double fields. |
<Salesforce prefix>scale | Provides the original scale for all double fields. |
<Salesforce prefix>digits | Provides the maximum number of digits for all integer fields. |
For more information about field attributes, see Field Attributes.
Event Generation
The Salesforce Bulk API 2.0 origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query.
- With the Pipeline Finisher executor
to stop the pipeline and transition the pipeline to a Finished state when the
origin completes processing available data.
When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to repeat an incremental query, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. If you configure the origin to repeat a full query, when you restart the pipeline, the origin uses the initial offset.
For an example, see Stopping a Pipeline After Processing All Available Data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending Email During Pipeline Processing.
-
With a destination to store information about completed queries.
For an example, see Preserving an Audit Trail of Events.
Event Record
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses the following type:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
The no-more-data event record includes no record fields.
Changing the API Version
The Salesforce Bulk API 2.0 origin uses version 57.0.0 of the Salesforce API, by default. You can use a different Salesforce API version if you need to access functionality not present in version 57.0.0.
- On the Salesforce tab, set the API Version property to the version that you want to use.
-
Download the relevant version of the following JAR files from Salesforce Web
Services Connector (WSC):
-
WSC JAR file - force-wsc-<version>.0.0.jar
-
Partner API JAR file - force-partner-api-<version>.0.0.jar
Where
<version>
is the API version number.For information about downloading libraries from Salesforce WSC, see the Salesforce Developer documentation.
-
-
In the following Data Collector
directory, replace the default force-wsc-
57.0.0
.jar and force-partner-api-57.0.0
.jar files with the versioned JAR files that you downloaded:$SDC_DIST/streamsets-libs/streamsets-datacollector-salesforce-lib/lib/
- Restart Data Collector for the changes to take effect.
Configuring a Salesforce Bulk API 2.0 Origin
Configure a Salesforce Bulk API 2.0 origin to read data from Salesforce using Salesforce Bulk API 2.0.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Produce Events Generates event records when events occur. Use for event handling. On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the Salesforce tab, configure the following
properties:
Salesforce Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Auth Endpoint Salesforce SOAP API authentication endpoint. For example, you might enter one of the following common values: login.salesforce.com
- Use to connect to a Production or Developer Edition organization.test.salesforce.com
- Use to connect to a sandbox organization.
Default is
login.salesforce.com
.API Version Salesforce API version used to connect to Salesforce. Default is 57.0.0. If you change the version, you also must download the relevant JAR files from Salesforce Web Services Connector (WSC).
Authentication Type Authentication type to use to connect to Salesforce: - Basic Authentication - Specify a user name and password.
- Connected App with OAuth - Use an OAuth 2.0-enabled connected app to enable machine-to-machine OAuth with JWT Bearer Flow.
Username Salesforce username in the following email format: <text>@<text>.com
.When using Connected App with OAuth authentication, the user must be authorized to use the app.
Password Salesforce password.
If the Data Collector machine is outside the trusted IP range configured in your Salesforce environment, you must use a security token along with the password. Use Salesforce to generate a security token and then set this property to the password followed by the security token.
For example, if the password is
abcd
and the security token is1234
, then set this property to abcd1234. For more information on generating a security token, see Reset Your Security Token.Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Consumer Key Consumer key from the connected app. Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Available when using Connected App with OAuth authentication.
Private Key Private key from the public key certificate that you used with the connected app. Ensure that the key is formatted correctly, with no spaces or extra line breaks. Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Available when using Connected App with OAuth authentication.
Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size. Default is 1000. The Data Collector default is 1000.
Max Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch. -
To query data, on the Query tab, configure the following
properties:
Query Property Description SOQL Query SOQL query to use when reading existing data from Salesforce. Include Deleted Records Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin. Incremental Mode Defines how the origin queries the database. Select to perform incremental queries. Clear to perform full queries. Default is incremental mode.
Offset Field Typically the Id system field, the offset field should be an indexed field in the record. Default is the Id field.
Number of Threads Number of threads the origin generates and uses for multithreaded processing. Default is 1. Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS. Default is 1 minute:
${1 * MINUTES}
.Initial Offset First offset value to use when the pipeline starts or after you reset the origin. Default is fifteen zeros:
000000000000000
.Maximum Query Columns Maximum number of columns that the query can retrieve. If the queried object has more columns, the query fails. Default is 512.
Salesforce Query Timeout Maximum number of seconds to wait for a response from Salesforce. Default is 60.
Poll Interval Interval to wait before polling for the Salesforce job status, in milliseconds. -
On the Advanced tab, configure the following
properties:
Advanced Property Description Use Proxy Specifies whether to use an HTTP proxy to connect to Salesforce. Proxy Hostname Proxy host. Proxy Port Proxy port. Proxy Requires Credentials Specifies whether the proxy requires a user name and password. Proxy Realm Authentication realm for the proxy server. Proxy Username User name for proxy credentials. Proxy Password Password for proxy credentials. Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Create Salesforce Attributes Adds Salesforce header attributes to records and field attributes to fields. The origin creates Salesforce attributes by default. Salesforce Attribute Prefix Prefix for Salesforce attributes. Disable Query Validation Disables query validation for SOQL queries. Maximum Records per Query Result Set Defines the maximum size of a result set, in records. Use to prevent timeouts when working with large result sets. Mismatched Types Behavior Action to take on data with a data type that differs from the data type specified in the schema: - Preserve the data as returned by Salesforce.
- Truncate numeric values to match the Salesforce schema.
- Round numeric values to match the Salesforce schema.
Use Mutual Authentication When enabled in Salesforce, you can use SSL/TLS mutual authentication to connect to Salesforce.
Mutual authentication is not enabled in Salesforce by default. To enable mutual authentication, contact Salesforce.
Before enabling mutual authentication, you must store a mutual authentication certificate in the Data Collector resources directory. For more information, see Keystore and Truststore Configuration.
Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties. Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key. Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate. Keystore File Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:
${runtime:resourcesDirPath()}/keystore.jks
By default, no keystore is used.
Keystore Type Type of keystore to use. Use one of the following types: - Java Keystore File (JKS)
- PKCS #12 (p12 file)
Default is Java Keystore File (JKS).
Keystore Password Password to the keystore file. A password is optional, but recommended.
Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.Keystore Key Algorithm Algorithm to manage the keystore.
Default is SunX509.