Databricks Delta Lake
The Databricks Delta Lake destination writes data to one or more Delta Lake tables on Databricks. For information about supported versions, see Supported Systems and Versions.
Use the Databricks Delta Lake destination for the following use cases:
- Bulk load new data into Delta Lake tables
- Build a pipeline that bulk loads new data into Delta Lake tables on Databricks. When processing new data, the destination uses the COPY command to load data into Delta Lake tables. For a detailed solution of how to design this pipeline, see Bulk Loading Data into a Delta Lake Table.
- Merge changed data into Delta Lake tables
- Build a pipeline that reads change data capture (CDC) data from a database and replicates the changes to Delta Lake tables on Databricks. When processing CDC data, the destination uses the MERGE command to load data into Delta Lake tables. For a detailed solution of how to design this pipeline, see Merging Changed Data into a Delta Lake Table.
The Databricks Delta Lake destination first stages the pipeline data in text files in Amazon S3, Azure Data Lake Storage Gen2, or Google Cloud Storage. Then, the destination sends the COPY or MERGE command to Databricks to process the staged files.
The Databricks Delta Lake destination uses a JDBC URL to connect to the Databricks cluster. When you configure the destination, you specify the JDBC URL and credentials to use to connect to the cluster. You can also use a connection to configure the destination. You define the information that the destination uses to connect to the staging location.
You specify the tables in Delta Lake to write the data to. The destination writes data from record fields to table columns based on matching names. You can configure the destination to compensate for data drift by creating new columns in existing database tables when new fields appear in records or by creating new database tables. When the destination performs data drift and creates new tables, you can configure it to partition the tables based on specified fields in the record.
You can configure the root field for the row, and any first-level fields that you want to exclude from the record. You can also configure the destination to replace missing fields or fields with invalid data types with user-defined default values and to replace newline characters in string fields with a specified character. You can specify the quoting mode, define quote and escape characters, and configure the destination to trim spaces.
The Databricks Delta Lake destination can use CRUD operations defined in the
sdc.operation.type
record header attribute to write data. For information about Data Collector change data
processing and a list of CDC-enabled origins, see Processing Changed Data.
Before you use the Databricks Delta Lake destination, you must complete a prerequisite task. The destination is available in the Databricks Enterprise stage library.
Prerequisite
Prepare the Databricks Cluster
Before you configure the Databricks Delta Lake destination, prepare your Databricks cluster.
In Databricks, configure and start your Databricks cluster, generate a personal access token, and locate the JDBC URL used to access the cluster.
- When using Amazon S3 as the staging location, see this Databricks article.
- When using Azure Data Lake Storage Gen2 as the staging location, see this Azure Databricks article.
- When using Google Cloud Storage as the staging location, see this Databricks article.
Load Methods
The Databricks Delta Lake destination can load data to Delta Lake tables using the following methods:
- COPY command for new data
- The COPY command, the default load method, performs a bulk synchronous load to Delta Lake, treating all records as INSERTS. Use this method to write new data to Delta Lake tables.
- MERGE command for CDC data
- Instead of treating all records as INSERT, the MERGE command inserts, updates, upserts, and deletes changed data to Delta Lake tables as appropriate. Use this method to write change data capture (CDC) data to Delta Lake tables using CRUD operations.
Use the recommended guidelines to optimize for performance and cost-effectiveness.
Defining the CRUD Operation for CDC Data
When you configure the Databricks Delta Lake destination to use the MERGE command to load CDC data, the destination can insert, update, upsert, or delete data.
sdc.operation.type
record header attribute. The destination
performs operations based on the following numeric values:- 1 for INSERT
- 2 for DELETE
- 3 for UPDATE
- 4 for UPSERT
If your
pipeline includes a CRUD-enabled origin that processes changed
data, the destination simply reads the operation type from the
sdc.operation.type
header attribute that
the origin generates. If your pipeline uses a non-CDC origin,
you can use the Expression Evaluator or a scripting processor to
define the record header attribute. For more information about
Data Collector
changed data processing and a list of CDC-enabled origins, see
Processing Changed Data.
Specifying Tables
You can use the Databricks Delta Lake destination to write to one or more tables. The destination writes data from record fields to the table columns based on matching names.
- Single table
- To write data to a single table, enter the name of the database and table
using the following
format:
<database_name>.<table_name>
- Multiple tables
- To write data to multiple tables, specify a field in the record that defines the database and tables to write to.
Use the Table Name property on the Databricks Delta Lake tab to specify the tables to write to.
Enabling Data Drift Handling
The Databricks Delta Lake destination can automatically compensate for changes in column or table requirements, also known as data drift.
- Create new columns
- The destination can create new columns in Delta Lake tables when new fields
appear in records. For example, if a record suddenly includes a new
Address2
field, the destination creates a newAddress2
column in the target table. - Create new tables
- When data drift handling is enabled, you can also configure the destination
to create new tables as needed. For example, say the destination writes data
to tables based on the region name in the
Region
field. When a newSW-3
region shows up in a record, the destination creates a newSW-3
table in Delta Lake and writes the record to the new table.
Partitioning Tables
When you enable data drift handling and automatic table creation, you can configure the Databricks Delta Lake destination to partition the tables and write to those partitions.
When partitioning tables, you specify the name of the table to partition and the name of the record field to become the partition column.
When appropriate, you can configure the destination to use the same partition column for all created tables. To do this, use an asterisk ( * ) for the Table property. Then, the destination uses the specified field as a partition column for all tables created by the destination.
For example, the following configuration uses the country
field as a
partition in all tables, and the region
field for the
customers
table:
Performance Optimization
Use the following tips to optimize for performance and cost-effectiveness when using the Databricks Delta Lake destination:
- Increase the batch size
- The maximum batch size is determined by the origin in the pipeline and typically has a default value of 1,000 records. To take advantage of the Databricks loading abilities when writing to Delta Lake tables using the COPY or MERGE commands, increase the maximum batch size in the pipeline origin to 20,000-50,000 records. Be sure to increase the Data Collector java heap size, as needed.
- Use multiple threads
- When writing to Delta Lake tables using the COPY command, you can use multiple threads to improve performance by including a multithreaded origin in the pipeline. When Data Collector resources allow, using multiple threads enables processing multiple batches of data concurrently. As with increasing the batch size, when using multiple threads, you should make sure that the Data Collector java heap size is sized appropriately.
- Enable additional connections to Databricks
- When writing to multiple Delta Lake tables using the COPY or MERGE commands, increase the number of connections that the Databricks Delta Lake destination makes to Databricks. Each additional connection allows the destination to write to an additional table, concurrently.
Staging Location
The Databricks Delta Lake destination first stages the pipeline data in text files in a specified location. Then, the destination sends the COPY or MERGE command to Databricks to process the staged files.
- Amazon S3
- After selecting Amazon S3 as the staging location, specify the existing S3 bucket to stage the files to. You also specify the credentials that the destination uses to connect to Amazon S3.
- ADLS Gen2
- After selecting ADLS Gen2 as the staging location, specify the name of the existing Azure account and storage container to stage the files to. You then configure the destination to use the appropriate authentication method to connect to Azure Data Lake Storage Gen2.
- Google Cloud Storage
- After selecting Google Cloud Storage as the staging location, specify the project ID and an existing Google Cloud Storage bucket to use. You also define credential provider details.
For all staging locations, you specify the stage file name prefix and whether the destination removes a staged file after its data is written to Delta Lake tables.
Amazon S3 Credentials
When you configure the destination to connect to an Amazon S3 staging location, the destination must pass credentials to Amazon Web Services.
- Instance profile
- When the execution Data Collector runs on an Amazon EC2 instance that has an associated instance profile, Data Collector uses the instance profile credentials to automatically authenticate with AWS.
- AWS access key pair
-
When the execution Data Collector does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an instance profile, you can connect using an AWS access key pair. When using an AWS access key pair, you specify the access key ID and secret access key to use.Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores.
ADLS Gen2 Authentication Information
When you configure the destination to connect to an ADLS Gen2 staging location, you select the authentication method that the destination uses to connect to Azure Data Lake Storage Gen2.
- OAuth 2.0
- Connections made with OAuth 2.0 authentication require the following
information:
- Application ID - Application ID for the Azure
Active Directory Data Collector application. Also known as the client ID.
For information on accessing the application ID from the Azure portal, see the Azure documentation.
- Application Key - Authentication key or
client secret for the Azure Active Directory
application. Also known as the client
secret.
For information on accessing the application key from the Azure portal, see the Azure documentation.
- Auth Token Endpoint - OAuth 2.0 token
endpoint for the Azure Active Directory v1.0
application for Data Collector. For example:
https://login.microsoftonline.com/<uuid>/oauth2/token.
- Application ID - Application ID for the Azure
Active Directory Data Collector application. Also known as the client ID.
- Shared Key
- Connections made with Shared Key authentication require the
following information:
- Account Shared Key - Shared access key
that Azure generated for the storage
account.
For more information on accessing the shared access key from the Azure portal, see the Azure documentation.
- Account Shared Key - Shared access key
that Azure generated for the storage
account.
Google Cloud Credentials
Before staging data on Google Cloud Storage, the Databricks Delta Lake destination must pass credentials to Google Cloud.
- Google Cloud default credentials
- Credentials in a file
- Credentials in a stage property
For details on how to configure each option, see Security in Google Cloud Stages.
Row Generation
When writing a record to a table, the Databricks Delta Lake destination includes all
record fields in the resulting row, by default. The destination uses the root field,
/
, as the basis for the resulting row.
You can configure the Row Field property on the Data tab to specify a map or list-map field in the record as the basis for the row. The resulting record includes only the data from the specified map or list-map field and excludes all other record data. Use this functionality when the data that you want to write to Delta Lake tables exists in a single map or list-map field within the record.
When you want to use the root field, but do not want to include all fields in the resulting row, configure the destination to ignore all specified first-level fields.
/
, as the basis for the resulting row. A record contains the
following
fields:{
"name": "Jane Smith",
"id": "557",
"address": {
"street": "101 3rd St",
"city": "Huntsville",
"state": "NC",
"zipcode": "27023"
}
}
The destination treats the address
map field as a field with an invalid
data type, processing the field as an error record by default. You can configure the
destination to ignore the field and process the remaining record data, as described in
Missing Fields and Fields with Invalid Types.
Missing Fields and Fields with Invalid Types
By default, the destination treats records with missing fields or with invalid data types in fields as error records.
You can configure the destination to ignore missing fields or fields with invalid data types, replacing the data in the field with an empty value.
The default for each data type is \N, which represents an empty
value in Delta Lake. You can specify a different default value to use for each data type
on the Data Advanced tab. For example, you might define the default value for a missing
String field or a String field with an invalid data type as none
or
not_applicable
.
To configure the destination to ignore missing fields and fields with invalid data types, select the Ignore Missing Fields and the Ignore Fields with Invalid Types properties on the Data Advanced tab.
Databricks Data Types
The Databricks Delta Lake destination converts Data Collector data types into Databricks data types before writing data to Delta Lake tables.
When you configure the destination to compensate for data drift, you can also configure the destination to create all new columns as String. However, by default, the destination converts record data to the appropriate data type.
The destination does not support nested Data Collector data types: List, List-Map, and Map. By default, the destination treats fields with invalid data types as an error record. You can configure the destination to ignore fields with invalid data types, as described in Missing Fields and Fields with Invalid Types.
The destination converts the following Data Collector data types into these Databricks data types:
Data Collector Data Type | Databricks Data Type |
---|---|
Boolean | Boolean |
Byte | Tinyint |
Byte_Array | Binary |
Char | String |
Date | Date |
Datetime | Timestamp |
Decimal | Decimal |
Double | Double |
Float | Float |
Integer | Int |
Long | Bigint |
Short | Smallint |
String | String |
Time | Timestamp |
Zoned_Datetime | Date |
Configuring a Databricks Delta Lake Destination
Configure a Databricks Delta Lake destination to write data to one or more Delta Lake tables on Databricks. Before you configure the destination, be sure to complete the prerequisite task.