Snowflake

The Snowflake executor runs one or more user-defined SQL queries each time it receives an event record. For information about supported versions, see Supported Systems and Versions.

Use the executor to run queries as part of an event stream. The executor is designed to load whole files staged by the Snowflake File Uploader destination to Snowflake tables each time that the executor receives an event record. However, you can use the executor to execute any JDBC query supported by Snowflake.

Before you use the Snowflake executor, you must complete a prerequisite task.

When you configure the executor, you specify connection information, one or more SQL queries to run, and how to submit the queries.

You can also configure the executor to generate events for another event stream, and whether you want record count information included in generated event records. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Prerequisite

Writing data to Snowflake requires that the user specified in the stage has the appropriate Snowflake privileges. The privileges differ depending on the method you use to write to Snowflake. Though you can use any command in the query, the privileges required for the most common commands are below.

When using other commands, see the Snowflake documentation to ensure that the user in the stage has the required privileges.

COPY command
When you use the COPY command to load data to Snowflake, the user must have a Snowflake role with one of the following sets of access privileges:
Object Type Privilege
Internal Snowflake stage READ, WRITE
Table SELECT, INSERT
MERGE command
When you use the MERGE command to load data to Snowflake, the user must have a Snowflake role with one of the following sets of access privileges:
Object Type Privilege
Internal Snowflake stage READ, WRITE
Table SELECT, INSERT, UPDATE, DELETE

Implementation Notes

Note the following implementation requirements when working with the Snowflake executor:

  • The executor is designed to load whole files staged by the Snowflake File Uploader destination to Snowflake tables each time that the executor receives an event record. However, you can use the executor to execute any JDBC query supported by Snowflake.
  • StreamSets recommends configuring the Snowflake warehouse to auto-resume upon receiving new queries.

Snowflake File Uploader and Executor Pipelines

Use the Snowflake executor to load data to Snowflake after it is placed by the Snowflake File Uploader destination into an internal stage.

The Snowflake File Uploader destination processes only whole file data, which typically requires a simple origin to destination pipeline. As a result, the typical Snowflake executor pipeline is also simple:

The pipeline consists of the following stages:
Whole file origin
Since the Snowflake File Uploader destination processes only whole file data, you must use a whole file origin to read data for the pipeline. For a list of origins that process whole file data, see Data Formats by Stage.
When you configure the origin, select Whole File for the data format.
Snowflake File Uploader destination
The Snowflake File Uploader destination processes whole files efficiently and stages output files to an internal Snowflake stage.
You configure the destination to generate events so the Snowflake executor can load each file to Snowflake tables as soon as each file is written.
When you configure the destination, select the Produce Events property on the General tab of the destination.
Snowflake executor
The Snowflake executor submits one or more user-defined SQL queries to Snowflake upon receiving an event record. You configure the executor to load each staged file to the appropriate Snowflake stage.
You configure the SQL queries to use on the Queries tab of the stage properties, and specify whether the queries should be submitted consecutively or simultaneously. In the queries, you define the files to load and the Snowflake table to write them to. You can define the following expression to use the file name specified in the Snowflake File Uploader event record: ${record:value('/filename')}.
For example, to load all files written by the Snowflake File Uploader destination to a single Snowflake table, you might use the following query:
COPY into <target_warehouse>.<target_database>.<target_schema>.<target_table> 
from @<staging_warehouse>.<staging_database>.<staging_schema>.<internal_stage/${record:value('/filename')}>;

When the pipeline runs, the Snowflake File Uploader destination generates an event record each time that it writes a file. The Snowflake executor then submits the queries to Snowflake, loading the staged files to Snowflake tables as specified in the queries.

Define a Role

The Snowflake executor requires a Snowflake role that grants all privileges needed to load data using the specified query. Each load method requires a different set of privileges.

Before configuring the executor, ensure that you have granted the required privileges to a Snowflake role, as explained in Prerequisite.

If you create a custom role with the required privileges, define the role that the stage uses in one of the following ways:
Assign the custom role as the default role
In Snowflake, assign the custom role as the default role for the Snowflake user account specified in the stage. A Snowflake user account is associated with a single default role.
Override the default role with the custom role
In the stage, use the Role property to specify the name of the custom role. The custom role overrides the role assigned to the Snowflake user account specified in the stage.
For example, you might define custom roles in Snowflake for specific data sources, and then specify one of the roles when configuring a Snowflake stage.
Use a Snowflake connection
When working with Control Hub, you can configure a Snowflake connection to provide connection details for Snowflake stages.
When a Snowflake stage is configured to use a Snowflake connection, the stage uses the role associated with the connection. You cannot define the role to use in stage properties. For more information, see the Control Hub documentation.

SQL Queries

You can specify one or more queries to perform each time that the Snowflake executor receives an event record. Use the following details to configure the SQL queries:
Query submission
Configure the Query Submission property to define how the executor submits your queries:
  • Consecutively - The executor submits one query at a time, and waits until the previous query is complete before submitting the next query. Queries are submitted in the order that they appear in the executor.
  • Simultaneously - The executor submits all queries at the same time. Snowflake processes the queries one at a time in the order that they appear in the executor. Use this option to reduce network traffic.
Referencing tables
Each query can write to one or more Snowflake tables. When specifying a table, use the full path to the table, as follows:
<warehouse>.<database>.<schema>.<table>

Note that <warehouse> may be required in some instances.

Expressions in queries
You can use Data Collector expressions in the SQL query. These expressions are evaluated before the executor passes the query to Snowflake.
For example, you can use ${record:value('/filename')} to reference the file name in the event record generated by the Snowflake File Uploader destination.

Sample Queries

To load each file written by the Snowflake File Uploader destination to the same Snowflake table, you might use the following query:
COPY into <target_warehouse>.<target_database>.<target_schema>.<target_table> 
from @<staging_warehouse>.<staging_database>.<staging_schema>.<internal_stage/${record:value('/filename')}>;
For example:
COPY into sharedDB.sales.infotable
from @myDB.mystaging.sdcstaging/${record:value('/filename')};
To load each file written by the Snowflake File Uploader destination to a table of the same name, you might use the following query, which uses the filename field in the event record:
COPY into <target_warehouse>.<target_database>.<target_schema>.${record:value('/filename')} 
from @<staging_warehouse>.<staging_database>.<staging_schema>.<internal_stage/${record:value('/filename')}>;
For example:
COPY into sharedDB.sales.${record:value('/filename')} 
from @myDB.mystaging.sdcstaging/${record:value('/filename')};

Event Generation

The Snowflake executor can generate events that you can use in an event stream. When you enable event generation, the executor generates events for each successful or failed query.

Snowflake executor events can be used in any logical way. For example:

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Record

Event records generated by the Snowflake executor have the following event-related record header attributes. Record header attributes are stored as String values.
Record Header Attribute Description
sdc.event.type Event type. Uses the following event types:
  • successful-query - Generated after a query successfully completes.
  • failed-query - Generated after a query fails.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The Snowflake executor can generate the following types of event records:
Successful query

The executor generates a successful-query event record after successfully completing a query.

Successful-query event records have the sdc.event.type record header attribute set to sucessful-query and include the following fields:
Event Field Name Description
query Query completed.
query-result Number of rows affected by query. Included if the Include Query Result Count in Events property is selected.
Failed query

The executor generates a failed-query event record after failing to complete a query.

Failed-query event records have the sdc.event.type record header attribute set to failed-query and include the following field:
Event Field Name Description
query Query attempted.

Configuring a Snowflake Executor

Configure a Snowflake executor to write staged whole files to Snowflake tables. Before you use the executor in a pipeline, complete the prerequisite task.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

  2. On the Snowflake Connection Info tab, configure the following properties:
    Note: Snowflake JDBC driver versions 3.13.25 or higher convert underscores to hyphens, by default. When needed, you can bypass this behavior by setting the allowUnderscoresInHost driver property to true. For more information and alternate solutions, see this Snowflake community article.
    Snowflake Connection Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Snowflake Region Region where the Snowflake warehouse is located. Select one of the following:
    • An available Snowflake region.
    • Other - Enables specifying a Snowflake region not listed in the property.
    • Custom JDBC URL - Enables specifying a virtual private Snowflake installation.

    Available when Include Organization is disabled.

    Custom Snowflake Region Custom Snowflake region. Available when using Other as the Snowflake region.
    Organization Snowflake organization.
    Account Snowflake account name.
    Authentication Method
    Authentication method to connect to Snowflake:
    • User Credentials
    • Key Pair Path - Use to specify the location of a private key file.
    • Key Pair Content - Use to provide the contents of a private key file.
    • OAuth
    • None
    Available when not using a Control Hub connection for connection details.
    User Snowflake user name.

    The user account or the custom role that overrides the default role for this user account must have the required Snowflake privileges.

    For information about required privileges, see Prerequisite.

    Required when using authentication.

    Password Snowflake password.

    Required when using User Credentials authentication.

    Private Key Path Path to the private key file.

    Required when using Key Pair Path authentication.

    Private Key Content Contents of the private key file.

    Required when using Key Pair Content authentication.

    Private Key Password Optional password for the specified private key file or content. Use for encrypted private keys.

    Available when using Key Pair Path or Key Pair Content authentication.

    OAuth Token OAuth token to use. You can use a Snowflake security integration access token, refresh token, or code grant.

    Required when using OAuth authentication.

    OAuth Client ID Client ID for Snowflake security integration access tokens.

    Available when using OAuth authentication.

    OAuth Client Secret Client secret for Snowflake security integration access tokens.

    Available when using OAuth authentication.

    OAuth Redirect URI Redirect URI for Snowflake security integration access tokens.

    Available when using OAuth authentication.

    Role Overrides the default role for the specified user account.

    The custom role must have the required Snowflake privileges. The required privileges depend on the queries that the executor uses. For details, see Prerequisite.

    Available when not using a Control Hub connection for connection details.

    Warehouse Snowflake warehouse.

    Available when not using a Control Hub connection for connection details.

    Database Snowflake database.

    Available when not using a Control Hub connection for connection details.

    Schema Snowflake schema.

    Available when not using a Control Hub connection for connection details.

    Use a Custom JDBC URL Custom JDBC URL to use when using a virtual private Snowflake installation.
    Include Organization Enables specifying the Snowflake organization.
    Connection Pool Size Maximum number of connections that the executor uses. The default, 0, ensures that the executor uses the same number of connections as threads used by the pipeline.

    When writing to multiple tables, increasing this property can improve performance.

    Use Private Link Snowflake URL Enables using a private link URL.

    You can specify the URL in the Custom JDBC URL property above. Or, you can define the appropriate values for the Account property and either the Snowflake Region or Organization properties.

    Connection Properties Additional Snowflake connection properties to use.

    To add properties, click Add and define the property name and value. Use the property names and values as expected by Snowflake.

  3. On the Queries tab, configure the following properties:
    Queries Property Description
    Override Warehouse Snowflake warehouse.

    If the selected Snowflake connection specifies a warehouse to use, this property overrides it.

    Available only when using a connection.

    SQL Queries One or more SQL queries to execute upon receiving an event record. By default, queries are executed in the specified order.

    To add additional queries, click Add.

    Query Submission Determines how the executor submits multiple queries:
    • Sequential - Submits a query and waits for query completion before submitting another.
    • Parallel - Submits all queries at once.

    By default, the executor waits for a query to complete before submitting the next query.

    Include Query Result Count in Events When the executor is configured to generate events, the executor includes query result counts in event records.
  4. On the Resilience tab, configure the following properties:
    Resilience Property Description
    Max Connection Attempts Maximum number of attempts to establish a connection to the data source.

    Default is 3.

    Wait Duration Time in seconds to wait between attempts to establish a connection to the data source.

    Default is 10 seconds.