Databricks Query

The Databricks Query executor runs one or more Spark SQL queries on Databricks each time it receives an event record. Use the executor as part of an event stream in the pipeline. For information about supported versions, see Supported Systems and Versions.

For example, you might use the Databricks Query executor to run a Spark SQL query that executes the VACUUM command to remove leftover files when the pipeline generates a pipeline stop event.

The Databricks Query executor uses a JDBC connection to connect to the Databricks cluster. When you configure the executor, you specify the JDBC URL and credentials to use to connect to the Databricks cluster, and then you define the Spark SQL queries to run.

When needed, you also define the connection information that the executor uses to connect to the storage location in Amazon S3 or Azure Data Lake Storage Gen2.

You can also configure the executor to generate events for another event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Before you use the Databricks Query executor, you must complete a prerequisite task.

Prerequisite

Before configuring the Databricks Query executor, prepare your Databricks cluster.

In Databricks, configure and start your Databricks cluster, generate a personal access token, and locate the JDBC URL used to access the cluster.

Tip: In Databricks, you can locate the JDBC URL for your cluster on the JDBC/ODBC tab in the cluster configuration details.

Spark SQL Queries

You define one or more Spark SQL queries that the Databricks Query executor runs in Databricks each time it receives an event record.

The Databricks Query executor waits for each query to complete before continuing with the next query for the same event record. It also waits for all queries to complete before starting the queries for the next event record. Depending on the speed of the pipeline and the complexity of the queries, the wait for query completion can slow pipeline performance.

When running multiple queries for an event record, the executor skips the subsequent queries if a query fails.

You can include the following elements in each query that you define for the executor:

Spark SQL
Use Spark SQL syntax in a query, such as:
COPY INTO <table identifier>
FROM <location>
FILEFORMAT = <format type>

You can use any Spark SQL syntax that is valid for Databricks, as described in the Databricks documentation.

StreamSets expression language functions
You can include a subset of the functions provided with the StreamSets expression language in a query. For example, when you define the location to copy the data from, you might use the record:value() function to define the location as follows:
FROM 's3a://${record:value('/bucket')}/${record:value('/objectKey')}'

When entering the query in the executor, press Ctrl + Space Bar to view the list of valid functions you can use.

Storage Connection

The Databricks Query executor can connect to a storage location before running Spark SQL queries.

Configure one of the following storage locations:
None
When using the executor to run a Spark SQL query that does not require connecting to a storage location, set the storage location to None.
Or, when using the executor to run a query that requires connecting to a storage location and the Databricks cluster has the necessary Amazon S3 or Azure storage connection information configured, then set the storage location to None. In this case, the Databricks cluster connects to the storage location when it runs the Spark SQL query.
Amazon S3
When using the executor to run a Spark SQL query that requires connecting to an Amazon S3 storage location, you can specify the Amazon S3 connection information in the executor properties. Any connection information specified in the executor properties takes precedence over the connection information configured in the Databricks cluster.
After selecting Amazon S3 as the storage location, you specify the AWS access key pair that the executor uses to connect to Amazon S3.
ADLS Gen2
When using the executor to run a Spark SQL query that requires connecting to an Azure Data Lake Storage Gen2 storage location, you can specify the Azure connection information in the executor properties. Any connection information specified in the executor properties takes precedence over the connection information configured in the Databricks cluster.
After selecting ADLS Gen2 as the storage location, you configure the executor to use the appropriate authentication method to connect to Azure Data Lake Storage Gen2.

ADLS Gen2 Authentication Information

When you configure the executor to connect to an ADLS Gen2 storage location, you select the authentication method that the executor uses to connect to Azure Data Lake Storage Gen2.

Select one of the following authentication methods:

OAuth 2.0
Connections made with OAuth 2.0 authentication require the following information:
  • Application ID - Application ID for the Azure Active Directory Data Collector application. Also known as the client ID.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

  • Auth Token Endpoint - OAuth 2.0 token endpoint for the Azure Active Directory v1.0 application for Data Collector. For example: https://login.microsoftonline.com/<uuid>/oauth2/token.
  • Application Key - Authentication key or client secret for the Azure Active Directory application. Also known as the client secret.

    For information on accessing the application key from the Azure portal, see the Azure documentation.

Shared Key
Connections made with Shared Key authentication require the following information:
  • Account FQDN - Fully qualified domain name of the Azure Data Lake Storage Gen2 account. For example: <account-name>.dfs.core.windows.net.
  • Account Shared Key - Shared access key that Azure generated for the storage account.

    For more information on accessing the shared access key from the Azure portal, see the Azure documentation.

Event Generation

The Databricks Query executor can generate events that you can use in an event stream. When you enable event generation, the executor generates events for each successful or failed query.

Databricks Query executor events can be used in any logical way. For example:

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Records

Event records generated by the Databricks Query executor have the following event-related record header attributes. Record header attributes are stored as String values.
Record Header Attribute Description
sdc.event.type Event type. Uses the following event types:
  • successful-query - Generated after a query successfully completes.
  • failed-query - Generated after a query fails.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The Databricks Query executor can generate the following types of event records:
Successful query

The executor generates a successful-query event record after successfully completing a query.

Successful-query event records have the sdc.event.type record header attribute set to sucessful-query and include the following fields:
Event Field Name Description
query Query completed.
query-result Number of rows affected by query. Included if the Include Query Result Count in Events property is selected.
Failed query

The executor generates a failed-query event record after failing to complete a query.

Failed-query event records have the sdc.event.type record header attribute set to failed-query and include the following field:
Event Field Name Description
query Query attempted.

Configuring a Databricks Query Executor

Configure a Databricks Query executor to run a Spark SQL query on Databricks upon receiving an event.

Before you use the executor, you must complete the prerequisite task.
  1. In the properties pane, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    JDBC URL JDBC URL used to connect to the Databricks cluster.

    Enter in the following format: jdbc:databricks://<server_hostname>:443/default;transportMode=http :ssl=1;httpPath=sql/protocolv1/o/0/xxxx-xxxxxx-xxxxxxxx;AuthMech=3;

    Tip: In Databricks, you can locate the JDBC URL for your cluster on the JDBC/ODBC tab in the cluster configuration details. As a best practice, remove the PWD parameter from the URL, and then enter the personal access token value in the User Token property on the Credentials tab.
    Use Credentials Enables entering credentials on the Credentials tab. Select when you do not include credentials in the JDBC URL.
    Spark SQL Query One or more Spark SQL queries to run each time the executor receives an event record.

    Using simple or bulk edit mode, click the Add icon to add additional queries.

    The executor processes multiple queries in order, and waits for each query to complete before continuing to the next query.

    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

    Include Query Result Count in Events Includes the number of rows impacted by a query in generated event records.
  3. To enter credentials separately from the JDBC URL, on the Credentials tab, configure the following properties:
    Credentials Property Description
    Username Enter token.
    User Token Enter your personal access token used to connect to the Databricks cluster.
    Tip: To secure sensitive information such as tokens, you can use runtime resources or credential stores.
    Note: The Legacy Drivers tab is not supported at this time.
  4. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout (seconds) Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout (seconds) Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, Data Collector ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime (seconds) Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.

    The query is performed after each connection to the database. If the stage disconnects from the database during the pipeline run, for example if a network timeout occurrs, the stage performs the query again when it reconnects to the database.

    Enable Parallel Queries Runs insert and delete queries in parallel to improve throughput. When enabled, the executor groups all queries from the batch into insert, delete, or other queries. The executor then runs all insert queries, followed by all other queries, and then all delete queries. It runs insert and delete queries simultaneously on all the configured connections to the database, but runs other queries serially.

    When disabled, the executor runs each query serially in the order that it arrives.

  5. On the Storage tab, configure the following properties:
    Storage Property Description
    Storage Location Storage location to connect to and copy or merge data from:
    • None - Does not connect to a storage location.

      Select when not copying or merging data from a storage location. Or, select when copying or merging data from a storage location and the Databricks cluster has the necessary storage information configured.

    • Amazon S3 - Connects to an Amazon S3 storage location.

      Select when copying or merging data from Amazon S3 and you need to specify the connection information in the executor properties.

    • ADLS Gen2 - Connects to an Azure Data Lake Storage Gen2 storage location.

      Select when copying or merging data from Azure Data Lake Storage Gen2 and you need to specify the connection information in the executor properties.

    AWS Access Key AWS access key ID.

    Available when using the Amazon S3 storage location.

    AWS Secret Key AWS secret access key.
    Available when using the Amazon S3 storage location.
    Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores.
    Azure Authentication Method Authentication method used to connect to Azure:
    • OAuth 2.0
    • Shared Key

    Available when using the ADLS Gen2 storage location.

    Application ID Application ID for the Azure Active Directory Data Collector application. Also known as the client ID.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

    Available when using the OAuth 2.0 authentication method for Azure.

    Application Key Authentication key or client secret for the Azure Active Directory application. Also known as the client secret.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

    Available when using the OAuth 2.0 authentication method for Azure.

    Auth Token Endpoint OAuth 2.0 token endpoint for the Azure Active Directory v1.0 application for Data Collector. For example: https://login.microsoftonline.com/<uuid>/oauth2/token.

    Available when using the OAuth 2.0 authentication method for Azure.

    Account FQDN Fully qualified domain name of the Azure Data Lake Storage Gen2 account. For example: <account-name>.dfs.core.windows.net.

    Available when using the Shared Key authentication method for Azure.

    Account Shared Key Shared access key that Azure generated for the storage account.

    For more information on accessing the shared access key from the Azure portal, see the Azure documentation.

    Available when using the Shared Key authentication method for Azure.

  6. On the Resilience tab, configure the following properties:
    Resilience Property Description
    Max Connection Attempts Maximum number of attempts to establish a connection to the data source.

    Default is 3.

    Wait Duration Time in seconds to wait between attempts to establish a connection to the data source.

    Default is 10 seconds.