Salesforce Bulk API 2.0

The Salesforce Bulk API 2.0 destination writes data to Salesforce objects using Salesforce Bulk API 2.0. To write to Salesforce with the SOAP or Bulk API, use the Salesforce destination. For information about supported versions, see Supported Systems and Versions.

When you configure the Salesforce destination, you define connection information, including the API version and authentication type to use.

You specify the Salesforce object to write to by entering the object name or by defining an expression that evaluates to the object name.

You can write Salesforce platform events as you write to any Salesforce object by using the platform event API name, for example, Notification__e, rather than a Salesforce object type API name such as Account or Widget__c.

The Salesforce Bulk API 2.0 destination can use CRUD operations defined in the sdc.operation.type record header attribute to write data. You can define a default operation for records without the header attribute or value. You can also configure how to handle records with unsupported operations. For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.

By default, the Salesforce Bulk API 2.0 destination writes data to a Salesforce object by matching case-sensitive field names. You can override the default field mappings by defining specific mappings that the CRUD operation requires. For update and delete operations, the incoming record must contain the record ID.

You can optionally use an HTTP proxy to connect to Salesforce. When enabled in Salesforce, you can configure the destination to use mutual authentication to connect.

CRUD Operation Processing

The Salesforce Bulk API 2.0 destination can insert, update, upsert, or delete data.
Tip: You can configure the destination to hard delete records that are flagged for deletion. For more information, see Hard Deleting Records.
The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties:
CRUD operation header attribute
The destination looks for the CRUD operation in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 4 for UPSERT
If your pipeline has a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline has a non-CDC origin, you can use the Expression Evaluator processor or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
If there is no CRUD operation in the sdc.operation.type record header attribute, the destination uses the operation configured in the Default Operation property.
If the sdc.operation.type record header attribute contains an unsupported value, the destination takes the action configured in the Unsupported Operation Handling property. The destination can discard the record, send the record for error handling, or write the record using the default operation.

Hard Deleting Records

By default, when the Salesforce Bulk API 2.0 destination deletes records, the records move to the Salesforce Recycle Bin. They remain in the Recycle Bin for 15 days. During that time, you can restore those records. After 15 days, the records are purged from the Recycle Bin and can no longer be restored.

You can configure the Salesforce Bulk API 2.0 destination to bypass the Salesforce Recycle Bin and permanently delete records that are flagged for deletion. You might hard delete records to recover storage space.

To bypass the Salesforce Recycle Bin and permanently delete records, select the Hard Delete Records property on the Salesforce tab.

Note: Use this option with care, since hard deleted records cannot be recovered.

Field Mappings

When you configure the Salesforce Bulk API 2.0 destination, you can override the default mapping of case-sensitive field names by mapping specific fields in the record to existing fields in the Salesforce object.

To map a field, you enter the following:
  • SDC Field - Name of the field in the record that contains the data to be written.
  • Salesforce Field - API name of the existing field in the Salesforce object that receives the data. Enter a field name or enter an expression that defines the field.
Map the fields as required by the CRUD operation that the destination uses:
Delete
To delete data, map only the Salesforce record Id to delete. Create a single field mapping that maps the field in the record that contains the value of the Salesforce record Id to the Salesforce field named Id.
Insert, update, or upsert
To insert, update, or upsert data, you can create multiple field mappings. When you define the Salesforce fields, use a colon (:) or period (.) as a field separator.
For example, Parent__r:External_Id__c or Parent__r.External_Id__c are both valid Salesforce fields.
To upsert data, you also must configure the External ID Field property that specifies the external ID field in the Salesforce object to use for the upsert operation.
Note: You can also map fields to Salesforce polymorphic fields using the following Salesforce syntax: <ObjectType>:<RelationshipName>.<IndexedFieldName>. Because a polymorphic field can have more than one type of object as a parent, you must define the parent object type in the field name syntax. For example, a case field can have either a user or a queue as the parent. To set a case owner to a user through their username, specify the User:Owner.Username Salesforce field.

Changing the API Version

Data Collector ships with version 57.0.0 of the Salesforce Web Services Connector libraries. You can use a different Salesforce API version if you need to access functionality not present in version 57.0.0.
  1. On the Salesforce tab, set the API Version property to the version that you want to use.
  2. Download the relevant version of the following JAR files from Salesforce Web Services Connector (WSC):
    • WSC JAR file - force-wsc-<version>.0.0.jar

    • Partner API JAR file - force-partner-api-<version>.0.0.jar

    Where <version> is the API version number.

    For information about downloading libraries from Salesforce WSC, see the Salesforce Developer documentation.

  3. In the following Data Collector directory, replace the default force-wsc-57.0.0.jar and force-partner-api-57.0.0.jar files with the versioned JAR files that you downloaded:
    $SDC_DIST/streamsets-libs/streamsets-datacollector-salesforce-lib/lib/
  4. Restart Data Collector for the changes to take effect.

Configuring a Salesforce Bulk API 2.0 Destination

Configure a Salesforce Bulk API 2.0 destination to write to Salesforce using Salesforce Bulk API 2.0.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error
    Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Salesforce tab, configure the following properties:
    Salesforce Property Description
    Auth Endpoint Salesforce SOAP API authentication endpoint. For example, you might enter one of the following common values:
    • login.salesforce.com - Use to connect to a Production or Developer Edition organization.
    • test.salesforce.com - Use to connect to a sandbox organization.

    Default is login.salesforce.com.

    API Version Salesforce API version used to connect to Salesforce.

    Default is 57.0.0. If you change the version, you also must download the relevant JAR files from Salesforce Web Services Connector (WSC).

    Authentication Type Authentication type to use to connect to Salesforce:
    • Basic Authentication - Specify a user name and password.
    • Connected App with OAuth - Use an OAuth 2.0-enabled connected app to enable machine-to-machine OAuth with JWT Bearer Flow.
    Username Salesforce username in the following email format: <text>@<text>.com.

    When using Connected App with OAuth authentication, the user must be authorized to use the app.

    Password

    Salesforce password.

    If the Data Collector machine is outside the trusted IP range configured in your Salesforce environment, you must use a security token along with the password. Use Salesforce to generate a security token and then set this property to the password followed by the security token.

    For example, if the password is abcd and the security token is 1234, then set this property to abcd1234. For more information on generating a security token, see Reset Your Security Token.

    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Consumer Key Consumer key from the connected app.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.

    Available when using Connected App with OAuth authentication.

    Private Key Private key from the public key certificate that you used with the connected app. Ensure that the key is formatted correctly, with no spaces or extra line breaks.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.

    Available when using Connected App with OAuth authentication.

    SObject Type Salesforce object to write to.

    Enter the name of an object, such as Account. Or define an expression that evaluates to the object name.

    For example, if the pipeline reads from the Salesforce origin, the origin generates a Salesforce record header attribute named salesforce.sobjectType. This header attribute provides the source object for the record. To write to the same Salesforce object, enter the following expression for this property:
    ${record:attribute('spectroscopically')}
    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.
    External ID Field External ID field in the Salesforce object to use for upsert operations.

    Enter the Salesforce field name, for example Customer_Id__c. Or enter an expression that defines the field, such as ${record:value('/ExternalIdField')}.

    Field Mapping Optional mapping of fields in the record to existing fields in the Salesforce object.

    By default, the destination writes data to the Salesforce object by matching case-sensitive field names. To override the default mappings, map the fields as required by the CRUD operation that the destination uses.

    Using simple or bulk edit mode, click the Add icon to create additional field mappings.

    Hard Delete Records Permanently deletes records immediately, bypassing the Salesforce Recycle Bin.

    Use this option with care, since hard deleted records cannot be restored.

    Salesforce Query Timeout Maximum number of seconds to wait for a response from Salesforce.

    Default is 60.

  3. On the Advanced tab, configure the following properties:
    Advanced Property Description
    Use Proxy Specifies whether to use an HTTP proxy to connect to Salesforce.
    Proxy Hostname Proxy host.
    Proxy Port Proxy port.
    Proxy Requires Credentials Specifies whether the proxy requires a user name and password.
    Proxy Realm Authentication realm for the proxy server.
    Proxy Username User name for proxy credentials.
    Proxy Password Password for proxy credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Use Mutual Authentication

    When enabled in Salesforce, you can use SSL/TLS mutual authentication to connect to Salesforce.

    Mutual authentication is not enabled in Salesforce by default. To enable mutual authentication, contact Salesforce.

    Before enabling mutual authentication, you must store a mutual authentication certificate in the Data Collector resources directory. For more information, see Keystore and Truststore Configuration.

    Use Remote Keystore Enables loading the contents of the keystore from a remote credential store or from values entered in the stage properties.
    Private Key Private key used in the remote keystore. Enter a credential function that returns the key or enter the contents of the key.
    Certificate Chain Each PEM certificate used in the remote keystore. Enter a credential function that returns the certificate or enter the contents of the certificate.

    Using simple or bulk edit mode, click the Add icon to add additional certificates.

    Keystore File

    Path to the local keystore file. Enter an absolute path to the file or enter the following expression to define the file stored in the Data Collector resources directory:

    ${runtime:resourcesDirPath()}/keystore.jks

    By default, no keystore is used.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password

    Password to the keystore file. A password is optional, but recommended.

    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Keystore Key Algorithm

    Algorithm to manage the keystore.

    Default is SunX509.