Snowflake

The Snowflake destination writes data to one or more tables in a Snowflake database. You can use the Snowflake destination to write to any accessible Snowflake database, including those hosted on Amazon S3, Google Cloud Storage, Microsoft Azure, and private Snowflake installations. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.

Tip: To write large files to Snowflake with no pipeline processing, try using the Snowflake File Uploader with the Snowflake executor for better performance.

The Snowflake destination stages CSV files to either an internal Snowflake stage or an external stage in Amazon S3, Google Cloud Storage, or Microsoft Azure. Then, the destination sends a command to Snowflake to process the staged files.

You can use the Snowflake destination to write new data or change data capture (CDC) data to Snowflake. When processing new data, the destination can load data to Snowflake using the COPY command or Snowpipe. When processing CDC data, the destination uses the MERGE command.

The Snowflake destination writes data from record fields to table columns based on matching names. The destination can compensate for data drift by creating new columns and tables in Snowflake when new fields and table references appear in records.

When you configure the Snowflake destination, you specify the Snowflake region, account and connection information and the number of connections to use to write to Snowflake. You can specify an organization name to use and define additional Snowflake connection properties, as needed. You can also use a connection to configure the destination.

You configure the Snowflake warehouse, database, schema, and the tables to use. You specify load method, error handling, and staging details, and optionally define advanced properties for Amazon S3 or Microsoft Azure.

You can optionally enable data drift. You can have the destination create new tables, as needed. You can also specify whether to create all new columns as Varchar instead of inferring the type, and whether to create Decimal columns for decimal data.

You can configure the root field for the row, and any first-level fields that you want to exclude from the record. You can also configure the destination to replace missing fields or fields containing invalid data types with the specified default values, and to replace newline characters in string fields with a specified character. You can specify the quoting mode, define quote and escape characters, and configure the destination to trim spaces. When processing CDC data, you specify the primary key location.

Before you use the Snowflake destination, you must complete several prerequisite tasks.

Note: StreamSets recommends configuring the Snowflake warehouse to auto-resume upon receiving new queries.
Tip: If you change the destination table schema manually rather than enabling data drift handling, restart the pipeline to allow the destination to discover schema changes.

Sample Use Cases

The following use cases show how you can replicate a database or offload a Hadoop data lake to Snowflake.
Tip: For additional use cases for the Snowflake destination, review the sample Snowflake pipelines included in the StreamSets Data Collector pipeline library. Download the sample pipelines and then import them into Data Collector. Review the sample pipelines or use them as a starting point to write data to Snowflake.

Here are a couple common scenarios for using the Snowflake destination:

Replicating a database

Say you want to replicate data being written to five tables in an Oracle database schema. You want to write both existing data and incoming CDC data to Snowflake.

To do this, you create two pipelines, one to load the existing data and one to process incoming data, as follows:
  • First pipeline for replicating data - The first pipeline uses the multithreaded JDBC Multitable Consumer origin to read from the tables that you want to replicate. To take advantage of Snowflake’s bulk load abilities, you configure the origin to use a very large batch size - somewhere between 20,000 and 50,000 records per batch. You set the number of threads to five to read data from all five tables concurrently, and you increase the connection pool size to five to allow writing to five tables in Snowflake concurrently.

    In the pipeline, you use as many processors as needed to process the data. Then, you configure the Snowflake destination to load the data into Snowflake.

    If you wanted the data to be immediately available after being written to Snowflake, you would use the default COPY command load method. But since you can tolerate a bit of latency, you use the faster, cheaper Snowpipe to load the data. Using Snowpipe requires performing some prerequisite steps in Snowflake.

    After the initial load is complete, you stop the first pipeline and start the second pipeline to process incoming CDC data.

  • Second pipeline for CDC data - In the second pipeline, you use the Oracle CDC Client origin and the Snowflake destination. You configure this origin to use a very large batch size as well, somewhere between 20,000 and 50,000 records per batch.

    In the destination, you select the Processing CDC Data (Use MERGE) property to perform CRUD operations when writing to Snowflake. This results in the destination using the MERGE command to load data into Snowflake. You specify a field in the records that contains the table name to use when writing to Snowflake, and you define the key columns for each table or configure the destination to query Snowflake for the key columns.

    To improve performance, you also increase the connection pool size. For more information, see Performance Optimization.

Offloading from Hadoop
Say you have a Hadoop data lake that you want to move into Snowflake. In this case, you only need one pipeline that includes the multithreaded Hadoop FS Standalone origin, all of the processors that you need, and the Snowflake destination.
In the Snowflake destination, you could use Snowpipe if you can afford a little latency in the availability of the data after its written. But Snowpipe only writes to tables that already exist. To allow the destination to create new tables to compensate for drifting data, you use the default COPY command to load data. As mentioned earlier, you can use large batches, multiple threads, and multiple connections to optimize pipeline performance.
Since the data in the data lake is probably less structured than typical database data, configure the following data drift and advanced data properties in the destination to smooth out the transition:
  • Enable the Data Drift property to allow creating new columns in tables when a new field appears.
  • Enable the Table Auto Create property to create new tables as needed.
  • To avoid generating unnecessary error records:
    • Use the Ignore Missing Fields property to replace missing data with default values.
    • Use the Ignore Fields with Invalid Types property to replace data of invalid data types with default values.
    • Define the default values to use for each Snowflake data type.
    • Replace newline characters in string fields with a specified character.

Prerequisites

Before you configure the Snowflake destination, complete the following prerequisites:

  1. Create an internal or external Snowflake stage.
  2. To use the COPY command to load new data, complete the COPY prerequisites.
  3. To use Snowpipe to load new data, complete the Snowpipe prerequisites.
  4. To use the MERGE command to load CDC data, complete the MERGE prerequisites.

Create a Snowflake Stage

Before using the destination in a pipeline, you must create a Snowflake internal or external stage.

The Snowflake destination stages CSV files to either an internal Snowflake stage or an external stage in Amazon S3, Google Cloud Storage, or Microsoft Azure. Then, the destination sends a command to Snowflake to process the staged files.

To use an external stage, create the external stage with the cloud service provider that hosts your Snowflake warehouse.

Create one of the following Snowflake stages, as appropriate:
Snowflake internal stage
You can stage data in Snowflake internal user stages or named stages. Do not use internal table stages.
User stages are created by default for each user. For steps on how to create a named stage, see CREATE STAGE in the Snowflake SQL command reference documentation.
You can use the default Snowflake configuration for both user and named stages.
For more information about Snowflake stages, see the Snowflake documentation.
Amazon S3 external stage
To stage data in an Amazon S3 external stage, create a Snowflake external stage in a bucket in the same S3 region that hosts your Snowflake virtual warehouse. For example, if your Snowflake warehouse is in AWS US West, then create the Snowflake external stage in a bucket in the AWS US West region.
When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Include a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.

For example, the following URL creates an external stage named sdc-externalstage in s3://mybucket/ and ensures that all staged data loads to Snowflake:

s3://mybucket/sdc-externalstage/
You can create an S3 stage using the Snowflake web interface or SQL. For more information, see the Snowflake documentation.
Google Cloud Storage external stage
To stage data in a Google Cloud Storage external stage, create a Snowflake storage integration in Google Cloud Storage. This is a multistep process described in the Snowflake documentation that ends with creating a Snowflake external stage. Be sure to complete all required steps.
Snowflake supports Regional Storage and Multi-Regional Storage accounts only.
When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Include a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.

For example, the following URL creates an external stage named sdc-externalstage in gcs://mybucket/ and loads all staged data to Snowflake:

gcs://mybucket/sdc-externalstage/
Microsoft Azure external stage
To stage data in a Microsoft Azure external stage, complete the following tasks:
  1. Configure Snowflake authentication for the Microsoft Azure Blob Storage container that you want to use.

    You can use an SAS token or an Azure account name and key for authentication. For information about configuring SAS token authentication, see the Snowflake documentation.

  2. Create a Snowflake external stage in the container.

    When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Include a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.

    For example, the following URL creates an external stage named sdc-externalstage in azure://myaccount.blob.core.windows.net/mycontainer/load/ and loads all staged data to Snowflake:
    azure://myaccount.blob.core.windows.net/mycontainer/load/sdc-externalstage/

    You can create an Azure stage using the Snowflake web interface or SQL. For more information, see Creating an Azure Stage in the Snowflake documentation.

AWS Credentials

When the Snowflake destination stages data on Amazon S3, it must pass credentials to Amazon Web Services.

Use one of the following methods to pass AWS credentials:
Instance profile
When the execution Data Collector runs on an Amazon EC2 instance that has an associated instance profile, Data Collector uses the instance profile credentials to automatically authenticate with AWS.
To use an instance profile, you enable the Use Instance Profile property on the Staging tab.
For more information about associating an instance profile with an EC2 instance, see the Amazon EC2 documentation.
AWS access key pair
When the execution Data Collector does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an instance profile, you can connect with an AWS access key pair. To connect with an AWS access key pair, you specify the access key ID and secret access key on the Staging tab.
Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
Note: To stage data on Amazon S3, the role or access key pair that you use must have the permissions needed to write to Amazon S3, including s3:GetBucketLocation and s3:PutObject.

Google Cloud Storage Credentials

Before staging data on Google Cloud Storage, the Snowflake destination must pass credentials to Google Cloud.

You can provide credentials using one the following options:
  • Google Cloud default credentials
  • Credentials in a file
  • Credentials in a stage property

For details on how to configure each option, see Security in Google Cloud Stages.

COPY Prerequisites

When processing new data, you can configure the destination to use the COPY command to load data to Snowflake tables.

Using the COPY command to load data requires a role with one of the following sets of access privileges:

  • Required privileges when using an internal Snowflake stage:
    Object Type Privilege
    Internal Snowflake stage READ, WRITE
    Table SELECT, INSERT
  • Required privileges when using an external stage:
    Object Type Privilege
    External stage USAGE
    Table SELECT, INSERT

If necessary, create a custom role in Snowflake and grant the role the required access privileges. Then in Snowflake, assign the custom role as the default role for the Snowflake user account specified in the destination. Or in the destination, specify the custom role that overrides the user's default role.

For more information about defining a role for the destination, see Define a Role.

Snowpipe Prerequisites

When processing new data, you can use Snowpipe, the Snowflake continuous ingestion engine, to load data to Snowflake tables.

Before using Snowpipe, complete the following prerequisites:
  1. In Snowflake, create a pipe for Snowpipe to use to load data.

    For more information about creating a pipe, see the Snowflake documentation.

  2. In Snowflake, generate a private key PEM and a public key PEM.

    For details about key-pair authentication, see the Snowflake documentation. You do not need to generate JSON Web Tokens (JWT) as described in Step 5.

    When you configure the destination, you specify the private key PEM and password and the public key PEM.

  3. In Snowflake, assign the public key to the Snowflake user account configured in the destination.

    You can use the Snowflake console or the ALTER USER command.

  4. Optionally, to secure the private key PEM and password, use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
  5. In Snowflake, create a custom role and grant the role the access privileges required to use Snowpipe.

    For a list of the required Snowpipe access privileges, see the Snowflake documentation.

  6. Then in Snowflake, assign the custom role as the default role for the Snowflake user account specified in the destination. Or in the destination, specify the custom role that overrides the user's default role.

    For more information about defining a role for the destination, see Define a Role.

MERGE Prerequisites

When processing CDC data, you can configure the destination to use the MERGE command to load data to Snowflake tables.

Using the MERGE command to load data requires a role with one of the following sets of access privileges:

  • Required privileges when using an internal Snowflake stage:
    Object Type Privilege
    Internal Snowflake stage READ, WRITE
    Table SELECT, INSERT, UPDATE, DELETE
  • Required privileges when using an external stage:
    Object Type Privilege
    External stage USAGE
    Table SELECT, INSERT, UPDATE, DELETE

If necessary, create a custom role in Snowflake and grant the role the required access privileges. Then in Snowflake, assign the custom role as the default role for the Snowflake user account specified in the destination. Or in the destination, specify the custom role that overrides the user's default role.

For more information about defining a role for the destination, see Define a Role.

Implementation Notes

Note the following implementation requirements when working with the Snowflake destination:

  • StreamSets recommends configuring the Snowflake warehouse to auto-resume upon receiving new queries.
  • If you change the destination table schema manually rather than enabling data drift handling, restart the pipeline to allow the destination to discover schema changes.

Load Methods

The Snowflake destination can load data to Snowflake using the following methods:
COPY command for new data
The COPY command, the default load method, performs a bulk synchronous load to Snowflake, treating all records as INSERTS. Use this method to write new data to Snowflake tables.
The COPY command provides real-time access to data as it is written. It does, however, incur Snowflake warehouse usage fees, which are rounded up to the hour at this time. Use the recommended guidelines to optimize for performance and cost-effectiveness.
Since the COPY command is the default load method, you do not need to configure the Snowflake destination to use this command.
Snowpipe for new data
Snowpipe, the Snowflake continuous ingestion service, performs an asynchronous load to Snowflake, treating all records as INSERTS. Use this method to write new data to Snowflake tables. When needed, you can configure the destination to use a custom Snowflake endpoint.
Snowpipe provides slightly delayed access to data, typically under one minute. Snowpipe incurs Snowflake fees for only the resources used to perform the write.
Before using Snowpipe, perform the prerequisite steps. Also, use the recommended guidelines to optimize for performance and cost-effectiveness.
To use Snowpipe to load new data, enable the Use Snowpipe property on the Snowflake tab of the destination. Then, configure the properties on the Snowpipe tab.
MERGE command for CDC data
Like the COPY command, the MERGE command performs a bulk synchronous load to Snowflake. But instead of treating all records as INSERT, it inserts, updates, upserts, and deletes records as appropriate. Use this method to write change data capture (CDC) data to Snowflake tables using CRUD operations.
Also like the COPY command, the MERGE command provides real-time access to data as it is written. And, it incurs Snowflake warehouse usage fees, which are rounded up to the hour at this time.
Use the recommended guidelines to optimize for performance and cost-effectiveness.
Important: To maintain the original order of data, do not use multiple threads or cluster execution mode when processing CDC data.
To use the MERGE command to load CDC data, select the Processing CDC Data (Use MERGE) property on the Data tab of the destination. Then, configure the Primary Key Location property to indicate the primary key columns to use.

For more information about Snowpipe or the COPY or MERGE commands, see the Snowflake documentation.

Primary Key Location

When you configure the Snowflake destination to write CDC data using the MERGE command, you must provide primary key information. You can configure the destination to access primary key information in the following ways:
Query Snowflake
The destination queries Snowflake for the primary key columns for each table that it writes to. Use this option only when primary key columns are defined in the Snowflake tables.
Use JDBC record header attribute
The destination uses primary key columns stored in the jdbc.primaryKeySpecification record header attribute.
Several origins, such as the Oracle CDC Client origin and the PostgreSQL CDC Client origin, populate the record header attribute automatically. You can also use a processor, such as the Expression Evaluator, to populate the header attribute within the pipeline.
When the destination is enabled for data drift handling, the destination updates the primary key columns in the target table based on the primary keys specified in the header attribute. For more information, see Creating Tables.
Use specified columns for each table
The destination uses the primary key columns that you specify in the Table Key Columns properties.
Note: When key columns are known, entering them manually is more efficient than the other two options.

Error Handling

You can configure Snowflake error handling in the Snowflake destination. The error handling properties determine how the on_error option is defined in the Snowflake SQL query.

The destination provides different error handling properties depending on the load method that you use:
COPY or MERGE command
Use the Error Behavior property on the Snowflake tab when you use the COPY or MERGE command to load data to Snowflake. When you use Snowpipe, the Error Behavior property is ignored.

The Error Behavior property provides the following error handling options:

  • Default - Does not set the on_error option. When not explicitly set, the Snowflake default on_error behavior is Abort Statement.
  • Continue - Ignores errors and continues processing.
  • Skip File - When encountering errors, skips writing the batch.
    When you use this option, you also configure a Skip File On Error property to specify when to skip the file:
    • First - After discovering the first error record.
    • Number - After discovering the specified number of error records in the batch.
    • Percentage - After discovering the specified percentage of error records in the batch.
  • Abort Statement - Skips writing the batch upon the first error.
Snowpipe
You can use the Snowpipe Error Behavior property on the Snowpipe tab when you enable the Use Snowflake and the Snowpipe Auto Create properties on the Snowflake tab. The destination cannot manage Snowpipe error handling when the destination does not create the Snowpipe.
The Snowpipe Error Behavior property provides the following error handling options:
  • Default - Does not set the on_error option. When not explicitly set, the Snowflake default on_error behavior for autocreated Snowpipes is Skip File.
  • Continue - Ignores errors and continues processing.
  • Skip File - When encountering errors, skips writing the batch.
    When you use this option, you also configure a Snowpipe Skip File On Error property to specify when the destination should skip the file:
    • First - After discovering the first error record.
    • Number - After discovering the specified number of error records in the batch.
    • Percentage - After discovering the specified percentage of error records in the batch.

Define a Role

The Snowflake destination requires a Snowflake role that grants all privileges needed to load data using the configured load method. Each load method requires a different set of privileges.

Before configuring the destination, ensure that you have granted the required privileges to a Snowflake role, as explained in Prerequisites.

If you create a custom role with the required privileges, define the role that the stage uses in one of the following ways:
Assign the custom role as the default role
In Snowflake, assign the custom role as the default role for the Snowflake user account specified in the stage. A Snowflake user account is associated with a single default role.
Override the default role with the custom role
In the stage, select Use Snowflake Role on the connection information tab. Then, specify the name of the custom role. The custom role overrides the default role assigned to the Snowflake user account specified in the stage.
For example, you might define custom roles in Snowflake for specific data sources, and then specify one of the roles when configuring a Snowflake stage.

Performance Optimization

Use the following tips to optimize for performance and cost-effectiveness when using the Snowflake destination:

Increase the batch size
The maximum batch size is determined by the origin in the pipeline and typically has a default value of 1,000 records. To take advantage of Snowflake's bulk loading abilities, increase the maximum batch size in the pipeline origin to 20,000-50,000 records. Be sure to increase the Data Collector java heap size, as needed. For more information, see Java Heap Size in the Data Collector documentation.
Important: Increasing the batch size is strongly recommended. Using the default batch size can be slow and costly.
Configure pipeline runners to wait indefinitely when idle
With the default configuration, a pipeline runner generates an empty batch after waiting idly for 60 seconds. As a result, the destination continues to execute metadata queries against Snowflake, even though no data needs to be processed. To reduce Snowflake charges when a pipeline runner waits idly, set the Runner Idle Time pipeline property to -1. This configures pipeline runners to wait indefinitely when idle without generating empty batches, which allows Snowflake to pause processing.
Important: Configuring pipeline runners to wait indefinitely when idle is strongly recommended. Using the default pipeline runner idle time can result in unnecessary Snowflake resource consumption and runtime costs.
Use multiple threads
When writing to Snowflake using Snowpipe or the COPY command, you can use multiple threads to improve performance when you include a multithreaded origin in the pipeline. When Data Collector resources allow, using multiple threads enables processing multiple batches of data concurrently.
As with increasing the batch size, when using multiple threads, you should make sure that the Data Collector java heap size is sized appropriately. For more information, see Java Heap Size in the Data Collector documentation.
Note: Do not use multiple threads to write CDC data to Snowflake with the MERGE command. When using multiple threads to process data, the original order of the data is not retained.
Enable additional connections to Snowflake
When writing to multiple Snowflake tables using the COPY or MERGE commands, increase the number of connections that the Snowflake destination makes to Snowflake. Each additional connection allows the destination to write to an additional table, concurrently.
For example, when writing to 10 tables with only one connection, the destination can only write to one table at a time. With 5 connections, the destination can write to 5 tables at a time. 10 connections enables writing to all 10 tables at the same time.

By default, the destination uses one connection for standard single-threaded pipelines. In multithreaded pipelines, the destination matches the number of threads used by the pipeline. That is, when a multithreaded origin is configured to use up to 3 threads, then by default, the Snowflake destination uses 3 connections to write to Snowflake, one for each thread.

Note that the number of connections is for the entire pipeline, not for each thread. So when using multiple threads to write to multiple tables, you can also improve performance by allocating additional connections. For example, when using 3 threads to write to 3 tables, you might increase the number of connections to 9 for maximum throughput.

Use the Connection Pool Size property to specify the maximum number of connections that the Snowflake destination can use. Use this property when writing to Snowflake with the COPY or MERGE commands. Increasing the number of connections does not improve performance when using Snowpipe.

Row Generation

When writing a record to a table, the Snowflake destination includes all record fields in the resulting row, by default. The destination uses the root field, /, as the basis for the resulting row.

You can configure the Row Field property to specify a map or list-map field in the record as the basis for the row. The resulting record includes only the data from the specified map or list-map field and excludes all other record data. Use this functionality when the data that you want to write to Snowflake exists in a single map or list-map field within the record.

If you want to use the root field, but do not want to include all fields in the resulting row, you can configure the destination to ignore all specified first-level fields.

The Snowflake destination converts all map or list-map fields within the specified root field to the Snowflake Variant data type. The Snowflake destination fully supports the Variant data type.

By default, records with missing fields or with invalid data types in fields are treated as error records. You can configure the destination to replace missing fields and data of invalid types with user-defined default values. Then, you specify the default values to use for each data type. You can also configure the destination to replace newline characters in string fields with a replacement character.

Specifying Tables

You can use the Snowflake destination to write to one or more tables in a Snowflake schema.

Specify the tables to use based on how many tables you want to write to:
Single table
To write data to a single table, simply enter table name as follows:
<table_name>
Multiple tables
To write data to multiple tables, specify a field in the record that defines the database and tables.
For example, say you have tables named after departments in your company, such as Operations, Sales, and Marketing. The records being processed have a dept field with matching values. You configure the destination to write records to the various tables using the following expression: ${record:value('/dept')}.
When using the COPY or MERGE command to load data, you can configure the Snowflake destination to automatically create tables when a new value appears in the specified field. For example, if the dept field suddenly includes an Engineering department, the destination can create a new Engineering table in Snowflake for the new data. For more information, see Enabling Data Drift Handling.
When writing to multiple Snowflake tables, you might also increase the number of connections that the destination uses for the write. For more information, see Performance Optimization.

Use the Table property on the Snowflake tab to specify the tables to write to.

Enabling Data Drift Handling

The Snowflake destination can automatically compensate for changes in column or table requirements, also known as data drift. You cannot enable data drift handling when the destination is configured to use Snowpipe.

Note: If you change the destination table schema manually rather than enabling data drift handling, you must restart the pipeline to allow the destination to discover schema changes.
The destination can handle data drift in the following ways:
Create new columns

The destination can create new columns in Snowflake tables when new fields appear in records. For example, if a record suddenly includes a new Address2 field, the destination creates a new Address2 column in the target table.

By default, the destination creates new columns based on the data in the new fields, such as creating a Double column for decimal data. You can, however, configure the destination to create all new columns as Varchar.

To enable the automatic creation of new columns, select the Data Drift Enabled property on the Snowflake tab. To create new columns as Varchar, select the Create New Columns as Varchar property.
Create new tables
The destination can create new tables as needed. For example, say the destination writes data to tables based on the region name in the Region field. When a new SW-3 region shows up in a record, the destination creates a new SW-3 table in Snowflake and writes the record to the new table.

You can use this functionality to create all necessary tables in an empty Snowflake database schema.

To enable the creation of new tables, select the Table Auto Create property.

Generated Data Types

When creating new tables or creating new columns in existing tables, the Snowflake destination uses field names to generate the new column names.

You can configure the destination to create all new columns as Varchar. However, by default, the Snowflake destination creates columns as follows:
Record Field Data Type Snowflake Column Data Type
Byte Array Binary
Char Char
String Varchar
Byte, Integer, Long, Short Number
Decimal, Double Double
Boolean Boolean
Date Date
Datetime Timestamp_Ntz
Float Float
Time Time
Zoned Datetime Timestamp_Tz
Map, List-Map Variant

The Snowflake destination fully supports the Variant data type.

Creating Tables

If you configure the Snowflake destination to use Snowpipe or to handle data drift, you can also configure the destination to create tables. Once configured, the destination creates tables when the specified or needed tables do not exist in Snowflake.

With the Table Auto Create property enabled on the Snowflake tab, the destination creates tables, columns, and primary key columns as follows:
Table
The destination creates a table if the table specified in the Table property does not exist.
While handling data drift, the destination creates tables needed to write records to tables that do not exist.
The destination names new tables based on the table name or expression specified in the Table property.
Table columns
In the created tables, the destination determines the columns from the first batch of data processed. The destination infers the data types from the data in the batch.
If the Upper Case Schema & Field Names property is enabled, then the destination uses upper case for all column names in the created tables.
The columns in the created tables are nullable, unless they are primary key columns.
Primary key columns
In the created tables, the destination creates primary key columns as follows:
  • If configured to use Snowpipe or the COPY command for new data, the destination uses the primary key information in the jdbc.primaryKeySpecification record header attribute to create primary key columns.

    Several origins, such as the Oracle CDC Client origin and the PostgreSQL CDC Client origin, populate the record header attribute automatically. You can also use a processor, such as the Expression Evaluator, to populate the header attribute within the pipeline.

    When the jdbc.primaryKeySpecification record header attribute does not exist, the destination creates tables without primary key columns.

  • If configured to use the MERGE command for CDC data and handle data drift, the destination uses the primary key information in the Primary Key Location property to create primary key columns. To allow for existing records that do not include the new primary key columns, the new primary key columns are nullable.
  • If configured to use the MERGE command for CDC data but not handle data drift, the destination uses the primary key information in the Primary Key Location property to create primary key columns. The new primary key columns are not nullable.

CRUD Operation Processing

The Snowflake destination can insert, update, upsert, or delete data when you configure the destination to process CDC data. When processing CDC data, the destination uses the MERGE command to write data to Snowflake.

When writing data, the Snowflake destination uses the CRUD operation specified in the sdc.operation.type record header attribute. The destination performs operations based on the following numeric values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 4 for UPSERT

If your pipeline has a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline has a non-CDC origin, you can use the Expression Evaluator processor or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.

Configuring a Snowflake Destination

Configure a Snowflake destination to write data to Snowflake tables. Before you use the destination in a pipeline, complete the prerequisite tasks.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error
    Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Snowflake Connection Info tab, configure the following properties:
    Note: Snowflake JDBC driver versions 3.13.25 or higher convert underscores to hyphens, by default. When needed, you can bypass this behavior by setting the allowUnderscoresInHost driver property to true. For more information and alternate solutions, see this Snowflake community article.
    Snowflake Connection Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    Organization Snowflake organization.
    Account Snowflake account name.
    User Snowflake user name.

    The user account or the custom role that overrides the default role for this user account must have the required Snowflake privileges.

    The required privileges depend on the load method that the destination uses. For details, see Prerequisites.

    Password Snowflake password.
    Include Organization Enables specifying the Snowflake organization.
    Snowflake Region Region where the Snowflake warehouse is located. Select one of the following:
    • An available Snowflake region.
    • Other - Enables specifying a Snowflake region not listed in the property.
    • Custom JDBC URL - Enables specifying a virtual private Snowflake installation.

    Available when Include Organization is disabled.

    Custom Snowflake Region Custom Snowflake region. Available when using Other as the Snowflake region.
    Virtual Private Snowflake URL Custom JDBC URL to use when using a virtual private Snowflake installation.
    Use Snowflake Role Overrides the default role for the specified user account.
    Snowflake Role Name Name of the custom Snowflake role to use.

    The custom role must have the required Snowflake privileges. The required privileges depend on the load method that the destination uses. For details, see Prerequisites.

    Connection Properties Additional Snowflake connection properties to use.

    To add properties, click Add and define the property name and value. Use the property names and values as expected by Snowflake.

    Connection Pool Size Maximum number of connections that the destination uses to write to Snowflake. The default, 0, ensures that the destination uses the same number of connections as threads used by the pipeline.

    When writing to multiple tables using the COPY or MERGE command, increasing this property can improve performance.

    Note: Must be set to 0 when processing CDC data.
  3. On the Snowflake tab, configure the following properties:
    Snowflake Property Description
    Warehouse Snowflake warehouse.
    Database Snowflake database.
    Schema Snowflake schema.
    Table Snowflake tables to write to. To write to a single table, enter the table name.

    To write to multiple tables, enter an expression that evaluates to the field in the record that contains the table name.

    For example: ${record:value('/table')}

    Or, to write to tables based on the table name in the jdbc.table record header attribute defined by the JDBC Multitable Consumer origin, you can use the following expression: ${record:attribute('jdbc.tables')}

    Use Snowpipe Uses Snowpipe to write to Snowflake. Use only when processing new data. You cannot use Snowpipe to load CDC data.

    Perform the necessary prerequisites before enabling Snowpipe. For more information about Snowpipe and other load methods, see Load Methods. For information about optimizing pipeline performance, see Performance Optimization.

    Data Drift Enabled Enables the destination to create new columns in Snowflake tables when new fields appear in records.

    Not available when using Snowpipe.

    Upper Case Schema & Field Names Converts all schema, table, and field names to all uppercase letters when querying Snowflake.

    When enabled, any new tables or fields created by the destination also use all uppercase letters.

    Table Auto Create Automatically creates tables when needed.

    Available when using Snowpipe or when data drift is enabled.

    Create New Columns as Varchar Creates all new columns as Varchar. By default, the destination creates new columns based on the type of data in the field.

    Available when data drift is enabled.

    Replicate Decimal Columns Creates Decimal columns for decimal data. The destination determines the precision and scale of the columns based on the decimal data in the first record of the first batch.
    Note: The maximum precision and scale for Snowflake Decimal columns is (28, 27). Longer values are truncated.

    When cleared, the destination creates Double columns for decimal data.

    Available when data drift is enabled.

    Error Behavior Action to take when errors occur. Used only with the COPY and MERGE load methods.

    Determines how the Snowflake on_error option is used in a query:

    • Default - Does not set the on_error option. When not explicitly set, the Snowflake default on_error behavior is Abort Statement.
    • Continue - Ignores errors and continues processing.
    • Skip File - When encountering errors, skips writing the batch. Requires configuring the Skip File on Error property.
    • Abort Statement - Skips writing the batch upon the first error.

    For more information about the on_error option, see the Snowflake documentation.

    Skip File on Error Indicates when to skip writing a batch:
    • First - After discovering the first error record.
    • Number - After discovering the specified number of error records in the batch.
    • Percentage - After discovering the specified percentage of error records in the batch.

    Available when using the Skip File error behavior.

    Max Error Records Maximum number of error records allowed in a batch before skipping writing the batch.

    Available when using the Number option for the Skip File on Error property.

    Max Error Record Percentage Maximum percentage of error records allowed in a batch before skipping writing the batch.

    Available when using the Percentage option for the Skip File on Error property.

  4. When loading data with Snowpipe, on the Snowpipe tab, configure the following properties:
    Snowpipe Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    Pipe Pipe to use when using Snowpipe to load data to Snowflake.

    Use Snowpipe only when processing new data. Be sure to complete all of the Snowpipe prerequisite tasks before configuring these Snowpipe properties.

    Private Key Private key PEM. Generated in Snowflake as part of the Snowpipe prerequisite tasks.

    Enter the key either as plain text or as a path to the file containing the private key content.

    Note: To secure sensitive information, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
    Private Key Password Private key password. Generated in Snowflake as part of the Snowpipe prerequisite tasks.
    Use Custom Snowpipe Endpoint Enables using a custom Snowpipe endpoint.
    Use TLS Enables the use of TLS to connect to the custom Snowpipe endpoint.
    Custom Snowpipe Host Host name for the custom Snowpipe endpoint.
    Custom Snowpipe Port Port number for the custom Snowpipe endpoint.
    Snowpipe Error Behavior Action to take when errors occur. Used only when loading to an autocreated Snowpipe.

    Determines how the Snowflake on_error option is used in a query:

    • Default - Does not set the on_error option. When not explicitly set, the Snowflake default on_error behavior for autocreated Snowpipes is Skip File.
    • Continue - Ignores errors and continues processing.
    • Skip File - When encountering errors, skips writing the batch. Requires configuring the Snowpipe Skip File on Error property.
    • Abort Statement - Skips writing the batch upon the first error.

    For more information about the on_error option, see the Snowflake documentation.

    Snowpipe Skip File on Error Indicates when to skip writing a batch:
    • First - After discovering the first error record.
    • Number - After discovering the specified number of error records in the batch.
    • Percentage - After discovering the specified percentage of error records in the batch.

    Available when using the Skip File error behavior.

    Snowpipe Max Error Records Maximum number of error records allowed in a batch before skipping writing the batch.

    Available when using the Number option for the Snowpipe Skip File on Error property.

    Snowpipe Max Error Record Percentage Maximum percentage of error records allowed in a batch before skipping writing the batch.

    Available when using the Percentage option for the Snowpipe Skip File on Error property.

  5. On the Staging tab, configure the following properties:
    Staging Property Description
    Stage Location Location of the Snowflake stage:
    • Amazon S3
    • Azure Blob Storage
    • Google Cloud Storage
    • Snowflake Internal Stage

    This property configuration determines the properties that display on this tab and the Staging Advanced tab.

    Connection Connection that defines the information required to connect to an external system. Available when staging data on Google Cloud Storage.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    Stage Database Optional database for the Snowflake stage. Configure this property when the stage is located in a different database than the Snowflake table.

    When not defined, the destination uses the database defined for the Snowflake table, on the Snowflake tab.

    Stage Schema Optional schema for the Snowflake stage. Configure this property when the stage is located in a different schema than the Snowflake table.

    When not defined, the destination uses the schema defined for the Snowflake table, on the Snowflake tab.

    Snowflake Stage Name Name of the Snowflake stage used to stage the data.

    Unless using a Snowflake internal user stage, you create this stage as part of the Snowflake prerequisite tasks.

    To use a Snowflake internal user stage, enter a tilde (~).

    Use Instance Profile Enables using an instance profile to write to an external stage on Amazon S3. Use only when Data Collector runs on an Amazon EC2 instance.
    AWS Access Key ID

    AWS access key ID.

    Required when not using an instance profile to write to an external stage on Amazon S3.

    AWS Secret Key ID

    AWS secret access key.

    Required when not using an instance profile to write to an external stage on Amazon S3.

    Purge Stage File After Ingesting Removes a stage file after its data is written to Snowflake. Do not use when writing to Snowflake with Snowpipe.
    S3 Stage File Name Prefix Optional prefix for the external stage name.
    S3 Compressed File Compresses files before writing them to S3. Keep this option enabled for optimum performance.
    Azure Authentication Type of authentication to use to connect to Azure:
    • Account Name and Key
    • SAS Token
    Azure Account Name Azure account name.
    Azure Account Key Azure account key.

    Used only for Account Name and Key authentication.

    To secure sensitive information, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.

    Azure SAS Token Azure SAS Token.

    Used only for SAS Token authentication.

    To secure sensitive information, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.

    Azure Stage File Name Prefix Optional prefix for the external stage name.
    Azure Compressed File Compresses files before writing them to Azure. Keep this option enabled for optimum performance.
    Project ID

    Google Cloud project ID to use.

    Credentials Provider Provider for Google Cloud credentials:
    • Default credentials provider - Uses Google Cloud default credentials.
    • Service account credentials file (JSON) - Uses credentials stored in a JSON service account credentials file.
    • Service account credentials (JSON) - Uses JSON-formatted credentials information from a service account credentials file.
    Credentials File Path (JSON) Path to the Google Cloud service account credentials file used to connect. The credentials file must be a JSON file.

    Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path.

    Credentials File Content (JSON) Contents of a Google Cloud service account credentials JSON file used to connect.

    Enter JSON-formatted credential information in plain text, or use an expression to call the information from runtime resources or a credential store. For more information about credential stores, see Credential Stores in the Data Collector documentation.

    Stage File Prefix Prefix to use for files staged on Google Cloud Storage. Default is sdc.
    Compress File with Gzip Enables compressing files staged on Google Cloud Storage with gzip.
  6. When using a Snowflake external stage, on the Staging Advanced tab, configure the following properties.
    This tab displays different properties depending on the location of the external stage.
    When using an external stage in Amazon S3, you can configure the following properties:
    Amazon S3 Advanced Property Description
    S3 Connection Timeout Seconds to wait for a response before closing the connection.
    S3 Socket Timeout Seconds to wait for a response to a query.
    S3 Max Error Retry Maximum number of times to retry requests.
    S3 Uploading Threads Size of the thread pool for parallel uploads. Used when working with multiple partitions and processing large objects in multiple parts.

    When working with multiple partitions, setting this property up to the number of partitions being used to can improve performance.

    For more information about this and the following properties, see the Amazon S3 TransferManager documentation.

    S3 Minimum Upload Part Size (MB) Minimum part size in bytes for multipart uploads.
    S3 Multipart Upload Threshold (MB) Minimum batch size in bytes for multipart uploads.
    S3 Proxy Enabled Specifies whether to use a proxy to connect.
    S3 Proxy Host Proxy host.
    S3 Proxy Port Proxy port.
    S3 Proxy Authentication Enabled Indicates that proxy authentication is used.
    S3 Proxy User S3 proxy user.
    S3 Proxy Password S3 proxy password.
    S3 Encryption Option that Amazon S3 uses to manage the encryption keys:
    • None
    • SSE-S3 - Use Amazon S3-managed keys.
    • SSE-KMS - Use Amazon Web Services KMS-managed keys.

    Default is None.

    S3 Encryption KMS ID Amazon resource name (ARN) of the AWS KMS master encryption key. Use the following format:
    <arn>:<aws>:<kms>:<region>:<acct ID>:<key>/<key ID>

    Used for SSE-KMS encryption only.

    S3 Encryption Context Key-value pairs to use for the encryption context. Click Add to add key-value pairs.

    Used for SSE-KMS encryption only.

    S3 Tags List of Amazon S3 tags to add to the staged objects. You can use object tagging to categorize storage and maintain file metadata such as Data Classification.

    Each tag is a key-value pair.

    For more information, see the Amazon S3 documentation.

    When using an external stage in Azure, you can configure the following properties:
    Azure Advanced Property Description
    Use Custom Blob Service URL Enables using a custom Azure Blob Storage URL.
    Custom Blob Service URL Custom Azure Blob Storage URL. Typically uses the following format:
    https://<Azure Account>.blob.core.windows.net
    Azure Encryption Enables using Azure default encryption at this time.
  7. On the Data tab, configure the following properties:
    Data Property Description
    Data Format Format for staging files. Select one of the following options:
    • CSV
    • Parquet
    Row Field Map or list-map field to use as the basis for the generated row. Default is /, which includes all record fields in the resulting row.
    Column Fields to Ignore A list of fields to ignore when writing to the destination. You can enter a comma-separated list of first level fields to ignore.
    Null Value Characters to use to represent null values.

    Default is \N, the null value characters for Snowflake.

    Processing CDC Data (Use MERGE) Enables performing CRUD operations and using the MERGE command to write to Snowflake tables. Select to process CDC data.

    Cannot be used when writing data with Snowpipe.

    Important: To maintain the original order of data, do not use multiple threads or cluster execution mode when processing CDC data.

    For more information about the MERGE command and other load methods, see Load Methods. For information about optimizing pipeline performance, see Performance Optimization.

    Primary Key Location Location of primary key details:
    • Snowflake - Use when primary key columns are defined in the Snowflake tables.
    • JDBC Header - Use when primary key columns are defined in the jdbc.primaryKeySpecification record header attribute.
    • Specify for Each Table - Use to list each target table and the primary key columns for the table.
    Note: When key columns are known, entering them manually is more efficient than the other two options.

    Available when using the MERGE command to process CDC data.

    Table Key Columns Primary key columns to use for each Snowflake table. Click the Add icon to define primary key columns for additional tables.

    Click the Add icon in the Key Column field to add additional key columns for a table.

    Available when Primary Key Location is set to Specify for Each Table.

  8. On the Data Advanced tab, configure the following properties:
    Data Advanced Property Description
    Snowflake File Format Allows the use of custom Snowflake CSV file formats. Should not be used unless recommended by StreamSets customer support.
    Ignore Missing Fields Allows writing records with missing fields to Snowflake tables. Uses the specified default value for the data type of the missing field.

    When not enabled, records with missing fields are treated as error records.

    Ignore Fields with Invalid Types Allows replacing fields that contain data of an invalid type with the specified default value for the data type.

    When not enabled, records with data of invalid types are treated as error records.

    Boolean Default Default value to use when replacing missing Boolean fields or Boolean fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Char Default Default value to use when replacing missing Char fields or Char fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Number Default Default value to use when replacing missing Number fields or Number fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Double Default Default value to use when replacing missing Double fields or Double fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Date Default Default value to use when replacing missing Date fields or Date fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Timestampntz Default Default value to use when replacing missing Timestampntz fields or Timestampntz fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Timestamptz Default Default value to use when replacing missing Timestamptz fields or Timestamptz fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Time Default Default value to use when replacing missing Time fields or Time fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Varchar Default Default value to use when replacing missing Varchar fields or Varchar fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Binary Default Default value to use when replacing missing Binary fields or Binary fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Variant Default Default value to use when replacing missing Variant fields or Variant fields with invalid data.

    Default is \N, which represents a null value in Snowflake.

    Replace Newlines Replaces newline characters in string fields with the specified replacement character.
    New Line Replacement Character Character to use to replace newline characters.
    Column Separator Character to use as a column separator.
    Quoting Mode Mode for handling special characters in the data, such as the column separator and newline character:
    • Quoted - Encloses data in each field with the specified quote character.
      The following example uses asterisks to enclose the data in a field:
      *string data, more string data*
    • Escaped - Precedes a special character with the specified escape character.
      The following example uses a backtick to escape the comma column separator in a field:
      string data`, more string data
    Quote Character Character to enclose field data.

    Available when using Quoted mode.

    Escape Character Character to precede special characters in field data.

    Available when using Escape mode.

    Trim Spaces Trims leading and trailing spaces from field data.