Google BigQuery (Enterprise)

The Google BigQuery (Enterprise) destination loads new data or change data capture (CDC) data to Google BigQuery. The destination can compensate for data drift to support loading to new or existing datasets, tables, and columns. For information about supported versions, see Supported Systems and Versions.

To load data, the destination first stages the pipeline data in CSV files in a staging area in Google Cloud Storage. Then, the destination uses the BigQuery API to run a batch load job that copies the staged files into BigQuery.

When you configure the destination, you specify authentication information for Google BigQuery and for your Google Cloud Storage staging area. You can also use a connection to define the information required to connect to BigQuery and to the staging area. You can optionally configure the destination to connect to Google BigQuery through a proxy server.

You specify the name of the dataset and tables to load the data to. The destination loads data from record fields to table columns based on matching names.

You can configure the destination to compensate for data drift by creating new columns in existing tables when new fields appear in records or by creating new tables or datasets as needed. When creating new tables, you can configure how the destination partitions the tables.

You can configure the root field for the row, and any first-level fields that you want to exclude from the record. You can specify characters to represent null values.

When processing CDC data, you can specify the key columns in the BigQuery table that the destination uses to evaluate the merge condition.

You can configure the destination to replace missing fields or fields with invalid data types with user-defined default values. You can also configure the destination to replace newline characters and trim leading and trailing spaces.

Before you use the Google BigQuery (Enterprise) destination, you must complete some prerequisite tasks. The destination is available in the Google Enterprise stage library.

Prerequisites

Before you configure the Google BigQuery (Enterprise) destination, complete the following prerequisite tasks:
  1. Prepare the Google BigQuery data warehouse.
  2. Prepare the Google Cloud Storage staging area.

Prepare the Google BigQuery Data Warehouse

Before configuring the destination, prepare your BigQuery data warehouse.

  1. If necessary, create a Google Cloud project.
  2. If necessary, create a Google BigQuery dataset and tables within the dataset.

    If you enable data drift handling for the destination, the destination can create new datasets and tables.

  3. Set up a Google service account that can connect to the project.
  4. Grant the service account the following permissions required to load data into BigQuery:
    • bigquery.tables.create
    • bigquery.tables.update
    • bigquery.tables.updateData
    • bigquery.tables.get
    • bigquery.tables.getData
    • bigquery.jobs.create
    If you want the destination to load CDC data, grant the account the additional permission:
    • bigquery.tables.delete
    If you want the destination to create datasets and tables automatically while handling data drift, grant the account the additional permissions:
    • bigquery.datasets.create

    For more information about the required permissions to load data to BigQuery, see the Google Cloud BigQuery documentation.

    Use this service account when you configure the credentials that the destination uses to connect to Google BigQuery.

Prepare the Google Cloud Storage Staging Area

Before configuring the destination, prepare a staging area in Google Cloud Storage. The destination stages CSV files in the staging area before loading them to Google BigQuery.

  1. Create a bucket in Google Cloud Storage to act as the staging area for the destination.
    As a best practice, create the bucket in the same project that you designated for the BigQuery data warehouse.
    Note: The bucket name must be DNS compliant. For more information about bucket naming conventions, see the Google Cloud Storage documentation.
  2. Set up a Google service account that can connect to the project.
  3. Grant the service account the following permissions required to stage data to and then load data from the Cloud Storage bucket:
    • storage.objects.create
    • storage.objects.get

    For more information about Cloud Storage permissions, see the Google Cloud BigQuery documentation.

    Use this service account when you configure the credentials that the destination uses to connect to Google Cloud Storage.

Credentials

When the Google BigQuery (Enterprise) destination stages CSV files in the Cloud Storage staging area and then loads the files to BigQuery, the destination must pass credentials to Google Cloud Storage and then to Google BigQuery.

You can provide credentials using one the following options:
  • Google Cloud default credentials
  • Credentials in a file
  • Credentials in a stage property

For details on how to configure each option, see Security in Google Cloud Stages.

Specifying Datasets and Tables

You can use the Google BigQuery (Enterprise) destination to load data to one or more tables in one or more datasets.

Specify the datasets and tables to use based on how many you want to write to:
Single dataset or table
To write data to a single dataset, simply enter the dataset name as follows:

<dataset_name>

Similarly, to write data to a single table, enter the table name as follows:

<table_name>

Multiple datasets or tables
To write data to multiple datasets and tables, specify a field in the record that defines the datasets and tables.
For example, say you have database schemas named after departments in your company, such as Operations, Sales, and Marketing. The records being processed have a dept field with matching values. You configure the destination to write records to the various datasets using the following expression: ${record:value('/dept')}.
Or, say that you want to replicate data from multiple tables in a SQL Server database. You use a JDBC Multitable Consumer origin which captures the name of each source table in the jdbc.tables record header attribute. To load data into tables based on the source table names, you use the following expression: ${record:attribute('jdbc.tables')}.
You can configure the destination to automatically create datasets or tables when a new value appears in the specified field. For example, if the dept field suddenly includes an Engineering department, the destination can create a new Engineering dataset for the new data. For more information, see Enabling Data Drift Handling.
When writing to multiple datasets and tables, you might also increase the number of connections that the destination uses for the write. For more information, see Performance Optimization.

Use the Dataset and Table properties on the Table Definition tab to specify the datasets and tables to write to.

Enabling Data Drift Handling

The Google BigQuery (Enterprise) destination can automatically compensate for changes in column, table, or dataset requirements, also known as data drift.

The destination can handle data drift in the following ways:
Create new columns

The destination can create new columns in existing tables when new fields appear in records. For example, if a record suddenly includes a new Address2 field, the destination creates a new Address2 column in the target table. The destination always creates new columns in existing tables as Nullable columns.

By default, the destination creates new columns based on the data in the new fields, such as creating an Int column for integer data. You can, however, configure the destination to create all new columns as String.

To enable the automatic creation of new columns, select the Enable Data Drift property on the Table Definition tab. To create all new columns as String, select the Create New Columns as String property.

Create new tables
When data drift handling is enabled, you can also configure the destination to create new tables as needed. For example, say the destination writes data to tables based on the region name in the Region field. When a new SW-3 region shows up in a record, the destination creates a new SW-3 table and writes the record to the new table.
When creating a table, you can configure whether the destination creates new columns in the table using the Required or Nullable mode. You can also configure how the destination partitions new tables. For details, see Partitioning New Tables.
To enable the creation of new tables, first enable data drift on the Table Definition tab, and then select the Create Table property. To specify the mode for new columns created in new tables, configure the New Columns Mode property. To partition new tables, select the Partition Table property and then define partition configurations.
Create new datasets
When data drift handling and the creation of new tables are enabled, you can also configure the destination to create new datasets as needed. For example, say the destination writes data to datasets based on the department name in the dept field. When a new Engineering department shows up in a record, the destination creates a new Engineering dataset and writes the record to a new table in the new dataset.
To enable the creation of new datasets, first enable data drift and the creation of new tables on the Table Definition tab, and then select the Create Dataset property.

Partitioning New Tables

When creating a new table, the destination can also partition the table.

To enable partitioning for new tables, first enable data drift and select the Create Table property on the Table Definition tab. Then, select the Partition Table property and define partition configurations.

You can add multiple partition configurations so that the destination can partition new tables in different ways. Because BigQuery can partition a table only once, the destination uses a single partition configuration when it creates a table.

For each partition configuration, you specify whether the configuration should be applied to a specific table or whether the configuration is the default. Then, you specify the partition options.

Partition Configurations

When you add a partition configuration, you specify one of the following ways to apply the configuration:
To a specific table
To create a partition configuration for a specific table, enter the dataset and table name to apply the partition configuration to.
As the default
To create the default partition configuration that the destination uses when it cannot find a matching configuration for a new table, select Default Partition. You can create one default partition configuration.

When you enable the creation and partitioning of new tables, the destination first looks for a matching partition configuration by dataset and table name. If the destination cannot find a match, it uses the default configuration. If a default configuration does not exist, the destination creates the table without partitioning.

You can set up partition configurations in multiple ways, based on whether you want to partition all new tables the same or differently. For example, let's say that you configure the destination to write data to new tables based on the region name in the Region field. The Region field can have one of the following values: WestRegion, EastRegion, and NorthRegion. You also enable the creation and partitioning of new tables. You can set up partition configurations for the three new tables in the following ways:
  • Different partitioning for each table - Create three partition configurations, one for each table: WestRegion, EastRegion, and NorthRegion.
  • Same partitioning for each table - Create a single default partition.
  • Different partitioning for one table, same partitioning for the remaining tables - Create a partition configuration for the WestRegion table. Then create a default partition configuration used for the remaining tables.
  • Partition one table, do not partition the remaining tables - Create a partition configuration for the WestRegion table. Do not create a default partition configuration.

Partition Options

For each partition configuration that you add, you specify one of the following partition types:
  • Date
  • Datetime
  • Timestamp
  • Integer
  • Ingestion

For the Date, Datetime, Timestamp, and Integer partition type, you also must specify the column to partition by. The data type of the column must match the selected partition type. For example, to partition by the Integer type, the data type of the column must be Integer.

You configure additional options for each partition type as required by BigQuery. For details about the available options for each partition type, see the Google Cloud BigQuery documentation.

Row Generation

When writing a record to a table, the Google BigQuery (Enterprise) destination includes all record fields in the resulting row, by default. The destination uses the root field, /, as the basis for the resulting row.

You can configure the Row Field property on the Data tab to specify a map or list-map field in the record as the basis for the row. The resulting record includes only the data from the specified map or list-map field and excludes all other record data. Use this functionality when the data that you want to write to BigQuery exists in a single map or list-map field within the record.

When you want to use the root field, but do not want to include all fields in the resulting row, configure the destination to ignore all specified first-level fields.

The Google BigQuery (Enterprise) destination cannot convert map or list-map fields nested within the specified root field. The destination treats these fields as having an invalid data type. For example, say that you configure the destination to use the root field, /, as the basis for the resulting row. A record contains the following fields:
{
    "name": "Jane Smith",
    "id": "557",
    "address": {
        "street": "101 3rd St",
        "city": "Huntsville",
        "state": "NC",
        "zipcode": "27023"
     }
}

The destination treats the address map field as a field with an invalid data type, processing the field as an error record by default. You can configure the destination to ignore the field and process the remaining record data, as described in Missing Fields and Fields with Invalid Types.

Tip: To write all fields in a record including nested map or list-map fields, use a Field Flattener processor in the pipeline to flatten the entire record to produce a record with no nested fields.

Missing Fields and Fields with Invalid Types

By default, the destination treats records with missing fields or with invalid data types in fields as error records.

You can configure the destination to ignore missing fields or fields with invalid data types, replacing the data in the field with an empty value.

The default for each data type is an empty string, which is the default null value in BigQuery. You can specify a different default value to use for each data type on the Data Advanced tab. For example, you might define the default value for a missing String field or a String field with an invalid data type as none or not_applicable.

To configure the destination to ignore missing fields and fields with invalid data types, select the Ignore Missing Fields and the Ignore Fields with Invalid Types properties on the Data Advanced tab.

Performance Optimization

Use the following tips to optimize for performance and cost-effectiveness when using the Google BigQuery (Enterprise) destination:

Increase the batch size
The maximum batch size is determined by the origin in the pipeline and typically has a default value of 1,000 records. To take advantage of the loading abilities that Google BigQuery provides, increase the maximum batch size in the pipeline origin to 20,000-50,000 records. Be sure to increase the Data Collector java heap size, as needed.
Important: Increasing the batch size is strongly recommended. Using the default batch size can be slow and costly.
Use multiple threads
You can use multiple threads to improve performance when you include a multithreaded origin in the pipeline. When Data Collector resources allow, using multiple threads enables processing multiple batches of data concurrently.
As with increasing the batch size, when using multiple threads, you should make sure that the Data Collector java heap size is sized appropriately.
Note: Do not use multiple threads when processing CDC data. When using multiple threads to process data, the original order of the data is not retained.
Enable additional connections to Google BigQuery
When loading data into multiple tables or datasets, increase the number of connections that the destination makes to Google BigQuery. Each additional connection allows the destination to load data into an additional table, concurrently.
For example, when loading data into 10 tables with only one connection, the destination can only load data into one table at a time. With 5 connections, the destination can load data into 5 tables at a time. 10 connections enables loading data into all 10 tables at the same time.

By default, the destination uses one connection for standard single-threaded pipelines. In multithreaded pipelines, the destination matches the number of threads used by the pipeline. That is, when a multithreaded origin is configured to use up to 3 threads, then by default, the Google BigQuery (Enterprise) destination uses 3 connections to load data into BigQuery, one for each thread.

Note that the number of connections is for the entire pipeline, not for each thread. So when using multiple threads to load data into multiple tables, you can also improve performance by allocating additional connections. For example, when using 3 threads to load data into 3 tables, you might increase the number of connections to 9 for maximum throughput.

Use the Connection Pool Size property on the Google BigQuery tab to specify the maximum number of connections that the destination can use.

Define the CRUD Operation

The Google BigQuery (Enterprise) destination can insert, update, upsert, or delete data when you configure the destination to process CDC data. When processing CDC data, the destination merges data into BigQuery tables.

To process CDC data, the destination uses the CRUD operation specified in the sdc.operation.type record header attribute. The destination performs operations based on the following numeric values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 4 for UPSERT

If your pipeline includes a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline uses a non-CDC origin, you can use the Expression Evaluator or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.

BigQuery Data Types

The Google BigQuery (Enterprise) destination converts Data Collector data types into BigQuery data types before writing data to BigQuery tables.

When you configure the destination to compensate for data drift, you can also configure the destination to create all new columns as String. However, by default, the destination converts record data to the appropriate data type.

The destination does not support nested Data Collector data types: List, List-Map, and Map. By default, the destination treats fields with invalid data types as an error record. You can configure the destination to ignore fields with invalid data types, as described in Missing Fields and Fields with Invalid Types.

Tip: To process fields in a record including nested list, list-map or map fields, use a Field Flattener processor to flatten the entire record to produce a record with no nested fields.

The destination converts the following Data Collector data types into these BigQuery data types:

Data Collector Data Type BigQuery Data Type
Boolean Boolean
Byte Bytes
Byte Array Bytes
Character String
Date Date
Datetime Datetime or Timestamp
Decimal Decimal
Double Float
Float Float
Integer Integer
Long Integer
Short Integer
String String
Time Time

Configuring a Google BigQuery (Enterprise) Destination

Configure a Google BigQuery (Enterprise) destination to load data to BigQuery tables. Before you use the destination in a pipeline, complete the prerequisite tasks.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Google BigQuery tab, configure the following properties:
    Google BigQuery Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Project ID Project ID to connect to.
    Credentials Provider Credentials to use:
    • Default credentials provider - Uses Google Cloud default credentials.
    • Service account credentials file (JSON) - Uses credentials stored in a JSON service account credentials file.
    • Service account credentials (JSON) - Uses JSON-formatted credentials information from a service account credentials file.
    Credentials File Path (JSON) Path to the Google Cloud service account credentials file used to connect. The credentials file must be a JSON file.

    Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path.

    Credentials File Content (JSON) Contents of a Google Cloud service account credentials JSON file used to connect.

    Enter JSON-formatted credential information in plain text, or use an expression to call the information from runtime resources or a credential store.

    Connection Pool Size Maximum number of connections that the destination uses to load data. Default is 0, which ensures that the destination uses the same number of connections as threads used by the pipeline.

    When loading to multiple tables, increasing this property can improve performance.

    Enable Proxy Connects to the Google BigQuery API through an HTTP proxy server.
    Proxy URL URL of the HTTP proxy server in the following format:

    http://<host>:<port>

    Proxy Username Optional user name to authenticate with the proxy server.
    Proxy Password Optional password to authenticate with the proxy server.
  3. On the Staging tab, configure the following properties:
    Staging Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Staging Location Location to stage the pipeline data in CSV files.

    At this time, only Google Cloud Storage is supported as the staging location.

    Purge Stage File After Ingesting Purges a staged file after its data is successfully loaded into Google BigQuery.
    Stage File Prefix Prefix to use for the name of the staged files.

    To create a unique name for each staged file, the destination appends a unique number to the prefix. For example, if you define the prefix as streamsets, the destination might name one of the staged files streamsets-69021a22-1474-4926-856f-eb4589d14cca.

    Bucket Bucket to use when staging files.

    Enter a bucket name or define an expression that evaluates to bucket names.

    Compress File with gzip Compresses files with gzip before writing to the staging area.
  4. On the Table Definition tab, configure the following properties:
    Table Definition Property Description
    Dataset BigQuery datasets to load data into. To load data into a single dataset, enter the dataset name.

    To load data into multiple datasets, enter an expression that evaluates to the field in the record that contains the dataset name. For example: ${record:value('/dataset')}

    Table BigQuery tables to load data into. To load data into a single table, enter the table name.

    To load data into multiple tables, enter an expression that evaluates to the field in the record that contains the table name. For example: ${record:value('/table')}

    Or, to load data into tables based on the table name in the jdbc.tables record header attribute generated by an origin that reads from a relational database, you can use the following expression: ${record:attribute('jdbc.tables')}

    The destination writes data from record fields to table columns based on matching names.

    Enable Data Drift Creates new columns in existing tables when new fields appear in records.

    When not enabled, the destination treats records that have new fields as error records.

    Create New Columns as String Creates all new columns as String.

    By default, the destination infers the data type based on the type of data in the field. For example, if a new field contains an integer, the destination creates a column with the Integer data type.

    Available when data drift handling is enabled.

    Create Table Automatically creates tables when needed.

    When not enabled, the destination treats a record attempting to load to a table that doesn't exist as an error record.

    Available when data drift handling is enabled.

    New Columns Mode Specifies the mode for new columns created in new tables:
    • Required - Null values are not allowed.
    • Nullable - Column allows null values.

    When the destination adds new fields to existing tables, columns are always added as Nullable.

    Available when data drift handling and the creation of new tables are enabled.

    Create Dataset Automatically creates datasets when needed.

    When not enabled, the destination treats a record attempting to load to a dataset that doesn't exist as an error record.

    Available when data drift handling and the creation of new tables are enabled.

    Partition Table Creates partitions in new tables based on the partition configurations.

    Available when data drift handling and the creation of new tables are enabled.

    Partition Configuration One or more partition configurations that define how to partition new tables.

    For each partition configuration, you specify whether the configuration is the default or whether it should be applied to a specific dataset or table. Then you specify the partition type and the required options for that type.

    Click Add Another to add an additional partition configuration.

    Available when table partitioning is enabled.

  5. On the Data tab, configure the following properties:
    Data Property Description
    Row Field Map or list-map field to use as the basis for the generated row. Default is /, which includes all record fields in the resulting row.
    Column Fields to Ignore List of fields to ignore when writing to BigQuery.

    You can enter a comma-separated list of first level fields to ignore. Field names are case sensitive.

    Null Value Characters to use to represent null values.

    Default is an empty string, which is the default null value in BigQuery.

    Merge CDC Data Enables performing CRUD operations and merging data into BigQuery tables. Select to process CDC data.
    Important: To maintain the original order of data, do not use multiple threads when processing CDC data.
    Key Columns Key columns used to evaluate the merge condition for each BigQuery table.

    Click the Add icon in the Key Column field to add additional key columns for a table.

    Available when processing CDC data.

  6. On the Data Advanced tab, configure the following properties:
    Data Advanced Property Description
    Ignore Missing Fields Allows writing records with missing fields to BigQuery tables. Uses the specified default value for the data type of the missing field.

    When not enabled, records with missing fields are treated as error records.

    Ignore Fields with Invalid Types Allows replacing fields that contain data of an invalid type with the specified default value for the data type.

    When not enabled, records with data of invalid types are treated as error records.

    Boolean Default Default value to use when replacing missing Boolean fields or Boolean fields with invalid data.

    Default is an empty string, which is the default null value in BigQuery.

    Numeric Types Default Default value to use when replacing missing Numeric fields or Numeric fields with invalid data.

    Default is an empty string, which is the default null value in BigQuery.

    Float Default Default value to use when replacing missing Float fields or Float fields with invalid data.

    Default is an empty string, which is the default null value in BigQuery.

    Decimal Default Default value to use when replacing missing Decimal fields or Decimal fields with invalid data.

    Default is an empty string, which is the default null value in BigQuery.

    Date Default Default value to use when replacing missing Date fields or Date fields with invalid data.

    Default is an empty string, which is the default null value in BigQuery.

    Timestamp Default Default value to use when replacing missing Timestamp fields or Timestamp fields with invalid data.

    Default is an empty string, which is the default null value in BigQuery.

    String Default Default value to use when replacing missing String fields or String fields with invalid data.

    Default is an empty string, which is the default null value in BigQuery.

    Replace Newlines Replaces newline characters in string fields with the specified replacement character.
    New Line Replacement Character Character to use to replace newline characters.
    Trim Spaces Trims leading and trailing spaces from field data.
    Column Separator Character to use as a column separator.
    Quote Character Character to enclose field data.