Databricks Query
The Databricks Query executor runs one or more Spark SQL queries on Databricks each time it receives an event record. Use the executor as part of an event stream in the pipeline. For information about supported versions, see Supported Systems and Versions.
For example, you might use the Databricks Query executor to run a Spark SQL query that executes the VACUUM command to remove leftover files when the pipeline generates a pipeline stop event.
The Databricks Query executor uses a JDBC connection to connect to the Databricks cluster. When you configure the executor, you specify the JDBC URL and credentials to use to connect to the Databricks cluster, and then you define the Spark SQL queries to run.
When needed, you also define the connection information that the executor uses to connect to the storage location in Amazon S3 or Azure Data Lake Storage Gen2.
You can also configure the executor to generate events for another event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Before you use the Databricks Query executor, you must complete a prerequisite task.
Prerequisite
Before configuring the Databricks Query executor, prepare your Databricks cluster.
In Databricks, configure and start your Databricks cluster, generate a personal access token, and locate the JDBC URL used to access the cluster.
Spark SQL Queries
You define one or more Spark SQL queries that the Databricks Query executor runs in Databricks each time it receives an event record.
The Databricks Query executor waits for each query to complete before continuing with the next query for the same event record. It also waits for all queries to complete before starting the queries for the next event record. Depending on the speed of the pipeline and the complexity of the queries, the wait for query completion can slow pipeline performance.
When running multiple queries for an event record, the executor skips the subsequent queries if a query fails.
You can include the following elements in each query that you define for the executor:
- Spark SQL
-
Use Spark SQL syntax in a query, such as:
COPY INTO <table identifier> FROM <location> FILEFORMAT = <format type>
You can use any Spark SQL syntax that is valid for Databricks, as described in the Databricks documentation.
- StreamSets expression language functions
-
You can include a subset of the functions provided with the StreamSets expression language in a query. For example, when you define the location to copy the data from, you might use the
record:value()
function to define the location as follows:FROM 's3a://${record:value('/bucket')}/${record:value('/objectKey')}'
When entering the query in the executor, press Ctrl + Space Bar to view the list of valid functions you can use.
Storage Connection
The Databricks Query executor can connect to a storage location before running Spark SQL queries.
- None
- When using the executor to run a Spark SQL query that does not require connecting to a storage location, set the storage location to None.
- Amazon S3
- When using the executor to run a Spark SQL query that requires connecting to an Amazon S3 storage location, you can specify the Amazon S3 connection information in the executor properties. Any connection information specified in the executor properties takes precedence over the connection information configured in the Databricks cluster.
- ADLS Gen2
- When using the executor to run a Spark SQL query that requires connecting to an Azure Data Lake Storage Gen2 storage location, you can specify the Azure connection information in the executor properties. Any connection information specified in the executor properties takes precedence over the connection information configured in the Databricks cluster.
ADLS Gen2 Authentication Information
When you configure the executor to connect to an ADLS Gen2 storage location, you select the authentication method that the executor uses to connect to Azure Data Lake Storage Gen2.
Select one of the following authentication methods:
- OAuth 2.0
- Connections made with OAuth 2.0 authentication require the following
information:
- Application ID - Application ID for the Azure Active Directory Data Collector
application. Also known as the client ID.
For information on accessing the application ID from the Azure portal, see the Azure documentation.
- Auth Token Endpoint - OAuth 2.0 token endpoint for the Azure Active
Directory v1.0 application for Data Collector. For example:
https://login.microsoftonline.com/<uuid>/oauth2/token.
- Application Key - Authentication key or client secret
for the Azure Active Directory application. Also known as the
client secret.
For information on accessing the application key from the Azure portal, see the Azure documentation.
- Application ID - Application ID for the Azure Active Directory Data Collector
application. Also known as the client ID.
- Shared Key
-
Connections made with Shared Key authentication require the following information:
- Account FQDN - Fully qualified domain name of the Azure Data Lake
Storage Gen2 account. For example:
<account-name>.dfs.core.windows.net
. - Account Shared Key - Shared access key that Azure
generated for the storage account.
For more information on accessing the shared access key from the Azure portal, see the Azure documentation.
- Account FQDN - Fully qualified domain name of the Azure Data Lake
Storage Gen2 account. For example:
Event Generation
The Databricks Query executor can generate events that you can use in an event stream. When you enable event generation, the executor generates events for each successful or failed query.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending Email During Pipeline Processing.
- With a destination to store event information.
For an example, see Preserving an Audit Trail of Events.
For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Event Records
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses the following event types:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
- Successful query
-
The executor generates a successful-query event record after successfully completing a query.
Successful-query event records have thesdc.event.type
record header attribute set tosucessful-query
and include the following fields:Event Field Name Description query Query completed. query-result Number of rows affected by query. Included if the Include Query Result Count in Events property is selected. - Failed query
-
The executor generates a failed-query event record after failing to complete a query.
Failed-query event records have thesdc.event.type
record header attribute set tofailed-query
and include the following field:Event Field Name Description query Query attempted.
Configuring a Databricks Query Executor
Configure a Databricks Query executor to run a Spark SQL query on Databricks upon receiving an event.
-
In the properties pane, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Produce Events Generates event records when events occur. Use for event handling. Required Fields Fields that must include data for the record to be passed into the stage. Tip: You might include fields that the stage uses.Records that do not include all required fields are processed based on the error handling configured for the pipeline.
Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions. Records that do not meet all preconditions are processed based on the error handling configured for the stage.
On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the JDBC tab, configure the following properties:
JDBC Property Description JDBC URL JDBC URL used to connect to the Databricks cluster. Enter in the following format:
jdbc:databricks://<server_hostname>:443/default;transportMode=http
:ssl=1;httpPath=sql/protocolv1/o/0/xxxx-xxxxxx-xxxxxxxx;AuthMech=3;
Tip: In Databricks, you can locate the JDBC URL for your cluster on the JDBC/ODBC tab in the cluster configuration details. As a best practice, remove thePWD
parameter from the URL, and then enter the personal access token value in the User Token property on the Credentials tab.Use Credentials Enables entering credentials on the Credentials tab. Select when you do not include credentials in the JDBC URL. Spark SQL Query One or more Spark SQL queries to run each time the executor receives an event record. Using simple or bulk edit mode, click the Add icon to add additional queries.
The executor processes multiple queries in order, and waits for each query to complete before continuing to the next query.
Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value. Use the property names and values as expected by JDBC.
Include Query Result Count in Events Includes the number of rows impacted by a query in generated event records. -
To enter credentials separately from the JDBC URL, on the
Credentials tab, configure the following
properties:
Credentials Property Description Username Enter token. User Token Enter your personal access token used to connect to the Databricks cluster. Tip: To secure sensitive information such as tokens, you can use runtime resources or credential stores.Note: The Legacy Drivers tab is not supported at this time. -
On the Advanced tab, optionally configure advanced
properties.
The defaults for these properties should work in most cases:
Advanced Property Description Maximum Pool Size Maximum number of connections to create. Default is 1. The recommended value is 1.
Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size. Default is 1.
Connection Timeout (seconds) Maximum time to wait for a connection. Use a time constant in an expression to define the time increment. Default is 30 seconds, defined as follows:${30 * SECONDS}
Idle Timeout (seconds) Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment. Use 0 to avoid removing any idle connections.
When the entered value is close to or more than the maximum lifetime for a connection, Data Collector ignores the idle timeout.
Default is 10 minutes, defined as follows:${10 * MINUTES}
Max Connection Lifetime (seconds) Maximum lifetime for a connection. Use a time constant in an expression to define the time increment. Use 0 to set no maximum lifetime.
When a maximum lifetime is set, the minimum valid value is 30 minutes.
Default is 30 minutes, defined as follows:${30 * MINUTES}
Transaction Isolation Transaction isolation level used to connect to the database. Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:
- Read committed
- Read uncommitted
- Repeatable read
- Serializable
Init Query SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed. The query is performed after each connection to the database. If the stage disconnects from the database during the pipeline run, for example if a network timeout occurrs, the stage performs the query again when it reconnects to the database.
Enable Parallel Queries Runs insert and delete queries in parallel to improve throughput. When enabled, the executor groups all queries from the batch into insert, delete, or other queries. The executor then runs all insert queries, followed by all other queries, and then all delete queries. It runs insert and delete queries simultaneously on all the configured connections to the database, but runs other queries serially. When disabled, the executor runs each query serially in the order that it arrives.
-
On the Storage tab, configure the following
properties:
Storage Property Description Storage Location Storage location to connect to and copy or merge data from: - None - Does not connect to a storage
location.
Select when not copying or merging data from a storage location. Or, select when copying or merging data from a storage location and the Databricks cluster has the necessary storage information configured.
- Amazon S3 - Connects to an Amazon S3 storage
location.
Select when copying or merging data from Amazon S3 and you need to specify the connection information in the executor properties.
- ADLS Gen2 - Connects to an Azure Data Lake Storage
Gen2 storage location.
Select when copying or merging data from Azure Data Lake Storage Gen2 and you need to specify the connection information in the executor properties.
AWS Access Key AWS access key ID. Available when using the Amazon S3 storage location.
AWS Secret Key AWS secret access key. Available when using the Amazon S3 storage location.Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores.Azure Authentication Method Authentication method used to connect to Azure: - OAuth 2.0
- Shared Key
Available when using the ADLS Gen2 storage location.
Application ID Application ID for the Azure Active Directory Data Collector application. Also known as the client ID. For information on accessing the application ID from the Azure portal, see the Azure documentation.
Available when using the OAuth 2.0 authentication method for Azure.
Application Key Authentication key or client secret for the Azure Active Directory application. Also known as the client secret. For information on accessing the application ID from the Azure portal, see the Azure documentation.
Available when using the OAuth 2.0 authentication method for Azure.
Auth Token Endpoint OAuth 2.0 token endpoint for the Azure Active Directory v1.0 application for Data Collector. For example: https://login.microsoftonline.com/<uuid>/oauth2/token.
Available when using the OAuth 2.0 authentication method for Azure.
Account FQDN Fully qualified domain name of the Azure Data Lake Storage Gen2 account. For example: <account-name>.dfs.core.windows.net
.Available when using the Shared Key authentication method for Azure.
Account Shared Key Shared access key that Azure generated for the storage account. For more information on accessing the shared access key from the Azure portal, see the Azure documentation.
Available when using the Shared Key authentication method for Azure.
- None - Does not connect to a storage
location.
-
On the Resilience tab, configure the following
properties:
Resilience Property Description Max Connection Attempts Maximum number of attempts to establish a connection to the data source. Default is 3.
Wait Duration Time in seconds to wait between attempts to establish a connection to the data source. Default is 10 seconds.