Snowflake Bulk

The Snowflake Bulk origin reads all available data from one or more tables in a Snowflake database, then stops the pipeline. This design enables processing Snowflake data without incurring the costs of continuously connecting to Snowflake with streaming pipelines.

You can use the Snowflake Bulk origin to read from any Snowflake database hosted on Amazon S3, Google Cloud Storage, Microsoft Azure, and private Snowflake installations. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.

During the read process, the Snowflake Bulk origin sends a command to Snowflake to stage data as CSV files on either an internal Snowflake stage or a hosted external stage. Then the Snowflake Bulk origin downloads and processes those CSV files. The Snowflake Bulk origin can use multiple threads to process the files in parallel.

When you configure the Snowflake Bulk origin, you specify the Snowflake region, account, and connection information. You define the number of threads to use to connect to Snowflake and process data. You can specify an organization name to use and define additional Snowflake connection properties, as needed. You can also use a connection to configure the origin.

You specify the tables to read and optional WHERE clauses for each table. You configure staging location details and the data format of the stage files.

Before you use the Snowflake Bulk origin, you must complete some prerequisite tasks.
Note: The Snowflake Bulk origin does not maintain an offset. Each time the pipeline runs, the origin processes all available data. So even if the pipeline stops before completing all processing, the origin processes all available data again when you restart the pipeline.

Prerequisites

Before you configure the Snowflake Bulk origin, complete the following prerequisite tasks:
  1. Create an internal or external Snowflake stage.
  2. Assign required privileges.

Create a Snowflake Stage

Before using the Snowflake Bulk origin in a pipeline, you must create a Snowflake internal or external stage.

During the read process, the Snowflake Bulk origin sends a command to Snowflake to stage data as CSV files on either an internal Snowflake stage or a hosted external stage. Then the Snowflake Bulk origin downloads and processes those CSV files.

To use an external stage, create the external stage with the cloud service provider that hosts your Snowflake warehouse.

Create one of the following Snowflake stages, as appropriate:
Snowflake internal stage
You can stage data in Snowflake internal user stages or named stages. Do not use internal table stages.
User stages are created by default for each user. For steps on how to create a named stage, see CREATE STAGE in the Snowflake SQL command reference documentation.
You can use the default Snowflake configuration for both user and named stages.
For more information about Snowflake stages, see the Snowflake documentation.
Amazon S3 external stage
To stage data in an Amazon S3 external stage, create a Snowflake external stage in a bucket in the same S3 region that hosts your Snowflake virtual warehouse. For example, if your Snowflake warehouse is in AWS US West, then create the Snowflake external stage in a bucket in the AWS US West region.
When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Include a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.

For example, the following URL creates an external stage named sdc-externalstage in s3://mybucket/ and ensures that all staged data loads to Snowflake:

s3://mybucket/sdc-externalstage/
You can create an S3 stage using the Snowflake web interface or SQL. For more information, see the Snowflake documentation.
Google Cloud Storage external stage
To stage data in a Google Cloud Storage external stage, create a Snowflake storage integration in Google Cloud Storage. This is a multistep process described in the Snowflake documentation that ends with creating a Snowflake external stage. Be sure to complete all required steps.
Snowflake supports Regional Storage and Multi-Regional Storage accounts only.
When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Include a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.

For example, the following URL creates an external stage named sdc-externalstage in gcs://mybucket/ and loads all staged data to Snowflake:

gcs://mybucket/sdc-externalstage/
Microsoft Azure external stage
To stage data in a Microsoft Azure external stage, complete the following tasks:
  1. Configure Snowflake authentication for the Microsoft Azure Blob Storage container that you want to use.

    You can use an SAS token or an Azure account name and key for authentication. For information about configuring SAS token authentication, see the Snowflake documentation.

  2. Create a Snowflake external stage in the container.

    When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Include a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.

    For example, the following URL creates an external stage named sdc-externalstage in azure://myaccount.blob.core.windows.net/mycontainer/load/ and loads all staged data to Snowflake:
    azure://myaccount.blob.core.windows.net/mycontainer/load/sdc-externalstage/

    You can create an Azure stage using the Snowflake web interface or SQL. For more information, see Creating an Azure Stage in the Snowflake documentation.

AWS Credentials

When the Snowflake Bulk origin reads data staged on Amazon S3, it must pass credentials to Amazon Web Services.

Use one of the following methods to pass AWS credentials:
Instance profile
When the execution Data Collector runs on an Amazon EC2 instance that has an associated instance profile, Data Collector uses the instance profile credentials to automatically authenticate with AWS.
To use an instance profile, you enable the Use Instance Profile property on the Staging tab.
For more information about associating an instance profile with an EC2 instance, see the Amazon EC2 documentation.
AWS access key pair
When the execution Data Collector does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an instance profile, you can connect with an AWS access key pair. To connect with an AWS access key pair, you specify the access key ID and secret access key on the Staging tab.
Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
Note: To process data staged on Amazon S3, the role or access key pair that you use must have the permissions needed to read from Amazon S3, including s3:GetBucketLocation and s3:PutObject.

Google Cloud Storage Credentials

Before processing data staged on Google Cloud Storage, the Snowflake Bulk origin must pass credentials to Google Cloud.

You can provide credentials using one the following options:
  • Google Cloud default credentials
  • Credentials in a file
  • Credentials in a stage property

For details on how to configure each option, see Security in Google Cloud Stages.

Assign Privileges

The Snowflake Bulk origin requires a Snowflake role that grants all privileges needed to process data.

The Snowflake role must include the following privileges:
Object Type Privilege
External stage or internal Snowflake stage READ, WRITE
Table SELECT

Batch Processing

Unlike most Data Collector origins, the Snowflake Bulk origin performs batch processing only. After processing available data, the origin stops the pipeline rather than waiting for additional data, as with streaming pipelines. This design enables processing Snowflake data without incurring the costs of continuously connecting to Snowflake with streaming pipelines.

The Snowflake Bulk origin does not maintain an offset during processing. Each time that you run a pipeline that includes the origin, the origin processes the available data in the specified tables. Then the origin stops the pipeline gracefully, allowing processes to complete.
Tip: If the pipeline stops before processing is complete, to avoid duplicate records, clear the destination system of processed records before starting the pipeline again.

Define a Role

The Snowflake Bulk origin requires a Snowflake role that can read from Snowflake.

Before configuring the origin, ensure that you have granted the required privileges to a Snowflake role, as explained in Prerequisites.

If you create a custom role with the required privileges, define the role that the stage uses in one of the following ways:
Assign the custom role as the default role
In Snowflake, assign the custom role as the default role for the Snowflake user account specified in the stage. A Snowflake user account is associated with a single default role.
Override the default role with the custom role
In the stage, select Use Snowflake Role on the connection information tab. Then, specify the name of the custom role. The custom role overrides the default role assigned to the Snowflake user account specified in the stage.
For example, you might define custom roles in Snowflake for specific data sources, and then specify one of the roles when configuring a Snowflake stage.

Multithreaded Processing

The Snowflake Bulk origin performs parallel processing and enables the creation of a multithreaded pipeline.

When you start the pipeline, the Snowflake Bulk origin determines the tables to read based on the specified table configuration. The origin then uses multiple concurrent threads for processing based on the Connection Pool Size property on the Snowflake Connection Info tab.

As the pipeline runs, each thread processes one table: first copying data to the specified staging area, then downloading the data from the staging area. Each thread then creates batches based on the maximum batch size configured for the origin, and passes them to pipeline runners.

A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Table Configurations

When you configure the Snowflake Bulk origin, you specify the tables to read. You can optionally specify WHERE clauses to limit the records that the origin processes.

Use Snowflake SQL to define a WHERE clause, and omit WHERE from the specified clause. For example, to use a WHERE clause to read sales data and omit returns, you might enter the following in the Where Clause property: total > 0.

You define this information differently depending on the number of tables that you want to process:
One table
To process one table, you simply configure the Table property. If you want to limit the rows to process, you can specify a WHERE clause in the Where Clause property.
Multiple tables
To process multiple tables, you select the Enable Table Name List property. This enables you to configure one or more sets of the following properties:
  • Table Name Pattern - A single table name or a table name pattern that uses SQL LIKE syntax and evaluates to a set of tables.
  • Where Clause - An optional WHERE clause that limits the records processed from a specified table or table name pattern. When used with a table name pattern, the WHERE clause is applied to every table associated with the pattern. For information about WHERE clause syntax, see the Snowflake documentation.
When you specify a table name pattern, you can configure the Table Name Exclusion Pattern property to exclude tables that match the specified table name pattern from processing. Use a Java-based regular expression. For more information about using regular expressions, see Regular Expressions Overview.
Note: When you specify a table name exclusion pattern, it is applied to all tables and table name patterns defined for the origin.
When reading from multiple Snowflake tables, you might configure the origin to perform multithreaded processing.

Example

Say you have a set of marketing tables that all use the same naming format except the Marketing-EU table. When you select the Enable Table Name List property, you specify the Marketing-EU table and matching WHERE clause, then you specify the table name pattern and WHERE clause for the other marketing tables, as follows:
Table Name Pattern property: Marketing-EU Where Clause property: total > 0
Table Name Pattern property: %_Marketing Where Clause property: total > 0
If you wanted to exclude the AU-Marketing table from processing, you would add the table to the Table Name Exclusion Pattern property.

Error Handling

You can configure Snowflake error handling in the Snowflake Bulk origin. The error handling properties determine how the on_error option is defined in the Snowflake SQL query.

Use the Error Behavior property on the Snowflake tab to define Snowflake error handling. The Error Behavior property provides the following error handling options:
  • Default - Does not set the on_error option. When not explicitly set, the Snowflake default on_error behavior is Abort Statement.
  • Continue - Ignores errors and continues processing.
  • Skip File - When encountering errors, skips reading the batch.
    When you use this option, you also configure a Skip File On Error property to specify when to skip the file:
    • First - After discovering the first error record.
    • Number - After discovering the specified number of error records in the batch.
    • Percentage - After discovering the specified percentage of error records in the batch.
  • Abort Statement - Skips reading the batch upon the first error.

Record Header Attributes

The Snowflake Bulk origin includes basic Snowflake information in the record header attributes of the generated record.

The origin creates the following record header attributes:
  • database - The database that contains the table that was processed.
  • schema - The schema that contains the table that was processed.
  • table - The table that contains the record that was processed.

Configuring a Snowflake Bulk Origin

Configure a Snowflake Bulk origin to read all available data from Snowflake tables, and then stop the pipeline. Before you use the origin, complete the prerequisite tasks.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Data Format Data format of the staged files.

    The origin processes delimited data at this time.

    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Snowflake Connection Info tab, configure the following properties:
    Note: Snowflake JDBC driver versions 3.13.25 or higher convert underscores to hyphens, by default. When needed, you can bypass this behavior by setting the allowUnderscoresInHost driver property to true. For more information and alternate solutions, see this Snowflake community article.
    Snowflake Connection Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    Organization Snowflake organization.
    Account Snowflake account name.
    User Snowflake user name.

    The user account or the custom role that overrides the default role for this user account must have the required Snowflake privileges.

    For information about required privileges, see Assign Privileges.

    Password Snowflake password.
    Include Organization Enables specifying the Snowflake organization.
    Snowflake Region Region where the Snowflake warehouse is located. Select one of the following:
    • An available Snowflake region.
    • Other - Enables specifying a Snowflake region not listed in the property.
    • Custom JDBC URL - Enables specifying a virtual private Snowflake installation.

    Available when Include Organization is disabled.

    Custom Snowflake Region Custom Snowflake region. Available when using Other as the Snowflake region.
    Virtual Private Snowflake URL Custom JDBC URL to use when using a virtual private Snowflake installation.
    Use Snowflake Role Overrides the default role for the specified user account.
    Snowflake Role Name Name of the custom Snowflake role to use.

    The custom role must have the required Snowflake privileges.

    For information about required privileges, see Assign Privileges.

    Connection Properties Additional Snowflake connection properties to use.

    To add properties, click Add and define the property name and value. Use the property names and values as expected by Snowflake.

    Connection Pool Size Number of threads the origin generates and uses for multithreaded processing.

    You might increase the number of connections when reading from multiple tables.

    For more information, see Multithreaded Processing.

  3. On the Snowflake tab, configure the following properties:
    Snowflake Property Description
    Warehouse Snowflake warehouse.
    Database Snowflake database.
    Schema Snowflake schema.
    Table Snowflake table to read. To read a single table, enter the table name.

    To read multiple tables, select the Enable Table Name List property.

    Where Clause Optional WHERE clause to limit the records to be read. Omit WHERE from the specified clause.
    Enable Table Name List

    Allows specifying a list of tables.

    Table List
    List of tables to read. Provides the following set of properties:
    • Table Name Pattern - A table name or an expression that evaluates to the set of tables to process. Use SQL LIKE syntax.
    • Where Clause - An optional WHERE clause that limits the records to process. Omit WHERE from the specified clause.

      When specified, the WHERE clause is applied to each table defined in the Table Name Pattern property.

    Click Add Another to define additional sets of properties.

    Table Name Exclusion Pattern Java-based regular expression that defines the tables to exclude from the read. This property applies to all tables defined in the table name list.

    For more information about using regular expressions, see Regular Expressions Overview.

    Error Behavior Action to take when errors occur. Determines how the Snowflake on_error option is used in a query:
    • Default - Does not set the on_error option. When not explicitly set, the Snowflake default on_error behavior is Abort Statement.
    • Continue - Ignores errors and continues processing.
    • Skip File - When encountering errors, skips reading the batch. Requires configuring the Skip File on Error property.
    • Abort Statement - Skips reading the batch upon the first error.

    For more information about the on_error option, see the Snowflake documentation.

    Skip File on Error Indicates when to skip reading a batch:
    • First - After discovering the first error record.
    • Number - After discovering the specified number of error records in the batch.
    • Percentage - After discovering the specified percentage of error records in the batch.

    Available when using the Skip File error behavior.

  4. On the Staging tab, configure the following properties:
    Staging Property Description
    Stage Location Location of the Snowflake stage:
    • Amazon S3
    • Azure Blob Storage
    • Google Cloud Storage
    • Snowflake Internal Stage

    This property configuration determines the properties that display on this tab and the Staging Advanced tab.

    Connection Connection that defines the information required to connect to an external system. Available when processing data staged on Google Cloud Storage.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    Snowflake Stage Name Name of the Snowflake stage used to stage the data. You generally create this stage as part of the prerequisite tasks.

    To use a Snowflake internal user stage, enter a tilde (~).

    Stage Database Optional database for the Snowflake stage. Configure this property when the stage is located in a different database than the Snowflake table.

    When not defined, the origin uses the database defined for the Snowflake table, on the Snowflake tab.

    Stage Schema Optional schema for the Snowflake stage. Configure this property when the stage is located in a different schema than the Snowflake table.

    When not defined, the origin uses the schema defined for the Snowflake table, on the Snowflake tab.

    Purge Stage File After Ingesting Removes a stage file after the data is read.
    Local File Prefix Prefix to use for files staged on internal Snowflake stage. Default is sdc.
    AWS Access Key ID

    AWS access key ID.

    Required when not using an instance profile to read from an external stage on Amazon S3.

    AWS Secret Key ID

    AWS secret access key.

    Required when not using an instance profile to read from an external stage on Amazon S3.

    Use Instance Profile Enables using an instance profile to read from an external stage on Amazon S3. Use only when Data Collector runs on an Amazon EC2 instance.
    S3 Stage File Name Prefix Optional prefix for the external stage name. Default is sdc.
    S3 Compressed File Ensures that files are compressed before being staged on S3. Keep this option enabled for optimum performance.
    Azure Authentication Type of authentication to use to connect to Azure:
    • Account Name and Key
    • SAS Token
    Azure Account Name Azure account name.
    Azure Account Key Azure account key.

    Used only for Account Name and Key authentication.

    To secure sensitive information, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.

    Azure SAS Token Azure SAS Token.

    Used only for SAS Token authentication.

    To secure sensitive information, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.

    Azure Stage File Name Prefix Optional prefix for the external stage name.
    Azure Compressed File Compresses files before writing them to Azure. Keep this option enabled for optimum performance.
    Project ID

    Google Cloud project ID to use.

    Credentials Provider Provider for Google Cloud credentials:
    • Default credentials provider - Uses Google Cloud default credentials.
    • Service account credentials file (JSON) - Uses credentials stored in a JSON service account credentials file.
    • Service account credentials (JSON) - Uses JSON-formatted credentials information from a service account credentials file.
    Credentials File Path (JSON) Path to the Google Cloud service account credentials file used to connect. The credentials file must be a JSON file.

    Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path.

    Credentials File Content (JSON) Contents of a Google Cloud service account credentials JSON file used to connect.

    Enter JSON-formatted credential information in plain text, or use an expression to call the information from runtime resources or a credential store. For more information about credential stores, see Credential Stores in the Data Collector documentation.

    Stage File Prefix Prefix to use for files staged on Google Cloud Storage. Default is sdc.
    Compress File with Gzip Enables compressing files staged on Google Cloud Storage with gzip.
  5. When using a Snowflake external stage, on the Staging Advanced tab, configure the following properties.

    This tab displays different properties depending on the location of the external stage.

    When using an external stage in Amazon S3, you can configure the following properties:
    Amazon S3 Advanced Property Description
    S3 Connection Timeout Seconds to wait for a response before closing the connection.
    S3 Socket Timeout Seconds to wait for a response to a query.
    S3 Max Error Retry Maximum number of times to retry requests.
    S3 Uploading Threads Size of the thread pool for parallel uploads. Used when working with multiple partitions and processing large objects in multiple parts.

    When working with multiple partitions, setting this property up to the number of partitions being used to can improve performance.

    For more information about this and the following properties, see the Amazon S3 TransferManager documentation.

    S3 Minimum Upload Part Size (MB) Minimum part size in bytes for multipart uploads.
    S3 Multipart Upload Threshold (MB) Minimum batch size in bytes for multipart uploads.
    S3 Proxy Enabled Specifies whether to use a proxy to connect.
    S3 Proxy Host Proxy host.
    S3 Proxy Port Proxy port.
    S3 Proxy Authentication Enabled Indicates that proxy authentication is used.
    S3 Proxy User S3 proxy user.
    S3 Proxy Password S3 proxy password.
    S3 Encryption Option that Amazon S3 uses to manage the encryption keys:
    • None
    • SSE-S3 - Use Amazon S3-managed keys.
    • SSE-KMS - Use Amazon Web Services KMS-managed keys.

    Default is None.

    S3 Encryption KMS ID Amazon resource name (ARN) of the AWS KMS master encryption key. Use the following format:
    <arn>:<aws>:<kms>:<region>:<acct ID>:<key>/<key ID>

    Used for SSE-KMS encryption only.

    S3 Encryption Context Key-value pairs to use for the encryption context. Click Add to add key-value pairs.

    Used for SSE-KMS encryption only.

    When using an external stage in Azure, you can configure the following properties:
    Azure Advanced Property Description
    Use Custom Blob Service URL Enables using a custom Azure Blob Storage URL.
    Custom Blob Service URL Custom Azure Blob Storage URL. Typically uses the following format:
    https://<Azure Account>.blob.core.windows.net
    Azure Encryption Enables using Azure default encryption at this time.