Google BigQuery
The Google BigQuery destination loads new data or change data capture (CDC) data to Google BigQuery. The destination can compensate for data drift to support loading to new or existing datasets, tables, and columns. For information about supported versions, see Supported Systems and Versions.
To load data, the destination first stages the pipeline data in CSV or Avro files in a staging area in Google Cloud Storage. Then, the destination creates a BigQuery batch load job that copies the staged files into BigQuery. For information about the Google BigQuery quota policy for batch loading data, see the Google Cloud BigQuery documentation.
When you configure the destination, you specify authentication information for Google BigQuery and for your Google Cloud Storage staging area. You can also use a connection to define the information required to connect to BigQuery and to the staging area. You can optionally configure the destination to connect to Google BigQuery through a proxy server.
You specify the name of the dataset and tables to load the data to. The destination loads data from record fields to table columns based on matching names.
You can configure the destination to compensate for data drift by creating new columns in existing tables when new fields appear in records or by creating new tables or datasets as needed. When creating new tables, you can configure how the destination partitions the tables.
You can configure the root field for the row, and any first-level fields that you want to exclude from the record. You can specify characters to represent null values.
When processing CDC data, you can specify the key columns in the BigQuery table that the destination uses to evaluate the merge condition.
You can configure the destination to replace missing fields or fields with invalid data types with user-defined default values. You can also configure the destination to replace newline characters and trim leading and trailing spaces.
Before you use the Google BigQuery destination, you must complete some prerequisite tasks.
Prerequisites
Prepare the Google BigQuery Data Warehouse
Before configuring the Google BigQuery destination, prepare your BigQuery data warehouse.
- If necessary, create a Google Cloud project.
-
If necessary, create a Google BigQuery dataset and tables within the
dataset.
If you enable data drift handling for the destination, the destination can create new datasets and tables.
- Set up a Google service account that can connect to the project.
-
Grant the service account the following permissions required to load data into
BigQuery:
bigquery.tables.create
bigquery.tables.update
bigquery.tables.updateData
bigquery.tables.get
bigquery.tables.getData
bigquery.jobs.create
If you want the destination to load CDC data, grant the account the additional permission:bigquery.tables.delete
If you want the destination to create datasets and tables automatically while handling data drift, grant the account the additional permissions:bigquery.datasets.create
For more information about the required permissions to load data to BigQuery, see the Google Cloud BigQuery documentation.
Use this service account when you configure the credentials that the destination uses to connect to Google BigQuery.
Prepare the Google Cloud Storage Staging Area
Before configuring the Google BigQuery destination, prepare a staging area in Google Cloud Storage. The destination stages CSV or Avro files in the staging area before loading them to Google BigQuery.
-
Create a bucket in Google Cloud Storage to act as the staging area for the
destination.
As a best practice, create the bucket in the same project that you designated for the BigQuery data warehouse.Note: The bucket name must be DNS compliant. For more information about bucket naming conventions, see the Google Cloud Storage documentation.
- Set up a Google service account that can connect to the project.
-
Grant the service account the following permissions required to stage data to
and then load data from the Cloud Storage bucket:
storage.objects.create
storage.objects.get
For more information about Cloud Storage permissions, see the Google Cloud BigQuery documentation.
Use this service account when you configure the credentials that the destination uses to connect to Google Cloud Storage.
Credentials
When the Google BigQuery destination stages CSV or Avro files in the Cloud Storage staging area and then loads the files to BigQuery, the destination must pass credentials to Google Cloud Storage and then to Google BigQuery.
- Google Cloud default credentials
- Credentials in a file
- Credentials in a stage property
For details on how to configure each option, see Security in Google Cloud Stages.
Staging File Formats
The Google BigQuery destination stages data in Avro, CSV, JSON, or Parquet files. For more information about staging file formats in BigQuery, see the BigQuery documentation.
Note the following staging file format limits:
- The destination supports nested data types for Avro or JSON staging files only.
- For CSV staging files, the destination does not support nested Data Collector data types. The destination treats these fields as having an invalid data type.
For more information, see Row Generation.
You specify the file format using the Staging File Format property on the Data tab.
Specifying Datasets and Tables
You can use the Google BigQuery destination to load data to one or more tables in one or more datasets.
- Single dataset or table
- To write data to a single dataset, simply enter the dataset name as
follows:
<dataset_name>
- Multiple datasets or tables
- To write data to multiple datasets and tables, specify a field in the record that defines the datasets and tables.
Use the Dataset and Table properties on the Table Definition tab to specify the datasets and tables to write to.
Enabling Data Drift Handling
The Google BigQuery destination can automatically compensate for changes in column, table, or dataset requirements, also known as data drift.
- Create new columns
-
The destination can create new columns in existing tables when new fields appear in records. For example, if a record suddenly includes a new
Address2
field, the destination creates a newAddress2
column in the target table. The destination always creates new columns in existing tables as Nullable columns.By default, the destination creates new columns based on the data in the new fields, such as creating an Int column for integer data. You can, however, configure the destination to create all new columns as String.
To enable the automatic creation of new columns, select the Enable Data Drift property on the Table Definition tab. To create all new columns as String, select the Create New Columns as String property.
- Create new tables
- When data drift handling is enabled, you can also configure the destination
to create new tables as needed. For example, say the destination writes data
to tables based on the region name in the
Region
field. When a newSW-3
region shows up in a record, the destination creates a newSW-3
table and writes the record to the new table. - Create new datasets
- When data drift handling and the creation of new tables are enabled, you can
also configure the destination to create new datasets as needed. For
example, say the destination writes data to datasets based on the department
name in the
dept
field. When a newEngineering
department shows up in a record, the destination creates a newEngineering
dataset and writes the record to a new table in the new dataset.
Partitioning New Tables
When creating a new table, the Google BigQuery destination can also partition the table.
To enable partitioning for new tables, first enable data drift and select the Create Table property on the Table Definition tab. Then, select the Partition Table property and define partition configurations.
You can add multiple partition configurations so that the destination can partition new tables in different ways. Because BigQuery can partition a table only once, the destination uses a single partition configuration when it creates a table.
For each partition configuration, you specify whether the configuration should be applied to a specific table or whether the configuration is the default. Then, you specify the partition options.
Partition Configurations
- To a specific table
- To create a partition configuration for a specific table, enter the dataset and table name to apply the partition configuration to.
- As the default
- To create the default partition configuration that the destination uses when it cannot find a matching configuration for a new table, select Default Partition. You can create one default partition configuration.
When you enable the creation and partitioning of new tables, the destination first looks for a matching partition configuration by dataset and table name. If the destination cannot find a match, it uses the default configuration. If a default configuration does not exist, the destination creates the table without partitioning.
Region
field. The Region
field can have one of the
following values: WestRegion
, EastRegion
, and
NorthRegion
. You also enable the creation and partitioning of new
tables. You can set up partition configurations for the three new tables in the
following ways:- Different partitioning for each table - Create three partition
configurations, one for each table:
WestRegion
,EastRegion
, andNorthRegion
. - Same partitioning for each table - Create a single default partition.
- Different partitioning for one table, same partitioning for the remaining
tables - Create a partition configuration for the
WestRegion
table. Then create a default partition configuration used for the remaining tables. - Partition one table, do not partition the remaining tables - Create a
partition configuration for the
WestRegion
table. Do not create a default partition configuration.
Partition Options
- Date
- Datetime
- Timestamp
- Integer
- Ingestion
For the Date, Datetime, Timestamp, and Integer partition type, you also must specify the column to partition by. The data type of the column must match the selected partition type. For example, to partition by the Integer type, the data type of the column must be Integer.
You configure additional options for each partition type as required by BigQuery. For details about the available options for each partition type, see the Google Cloud BigQuery documentation.
Row Generation
When writing a record to a table, the Google BigQuery destination includes all record
fields in the resulting row, by default. The destination uses the root field,
/
, as the basis for the resulting row.
You can configure the Row Field property on the Data tab to specify a map or list-map field in the record as the basis for the row. The resulting record includes only the data from the specified map or list-map field and excludes all other record data. Use this functionality when the data that you want to write to BigQuery exists in a single map or list-map field within the record.
When you want to use the root field, but do not want to include all fields in the resulting row, configure the destination to ignore all specified first-level fields.
The destination supports nested data types for Avro or JSON staging files only.
/
, as the basis for the resulting row. A record contains the
following
fields:{
"name": "Jane Smith",
"id": "557",
"address": {
"street": "101 3rd St",
"city": "Huntsville",
"state": "NC",
"zipcode": "27023"
}
}
The destination treats the address
map field as a field with an invalid
data type, processing the field as an error record by default. You can configure the
destination to ignore the field and process the remaining record data, as described in
Missing Fields and Fields with Invalid Types.
Missing Fields and Fields with Invalid Types
By default, the destination treats records with missing fields or with invalid data types in fields as error records.
You can configure the destination to ignore missing fields or fields with invalid data types, replacing the data in the field with an empty value.
The default for each data type is an empty string, which is the default null value in
BigQuery. You can specify a different default value to use for each data type on the
Data Advanced tab. For example, you might define the default value for a missing String
field or a String field with an invalid data type as none
or
not_applicable
.
To configure the destination to ignore missing fields and fields with invalid data types, select the Ignore Missing Fields and the Ignore Fields with Invalid Types properties on the Data Advanced tab.
Performance Optimization
Use the following tips to optimize for performance and cost-effectiveness when using the Google BigQuery destination:
- Increase the batch size
- The maximum batch size is determined by the origin in the pipeline and typically has a default value of 1,000 records. To take advantage of the loading abilities that Google BigQuery provides, increase the maximum batch size in the pipeline origin to 20,000-50,000 records. Be sure to increase the Data Collector java heap size, as needed.
- Use multiple threads
- You can use multiple threads to improve performance when you include a multithreaded origin in the pipeline. When Data Collector resources allow, using multiple threads enables processing multiple batches of data concurrently.
- Enable additional connections to Google BigQuery
- When loading data into multiple tables or datasets, increase the number of connections that the destination makes to Google BigQuery. Each additional connection allows the destination to load data into an additional table, concurrently.
CRUD Operation Processing
The Google BigQuery destination can insert, update, upsert, or delete data when you configure the destination to process CDC data. When processing CDC data, the destination merges data into BigQuery tables.
sdc.operation.type
record header attribute. The destination
performs operations based on the following numeric values:- 1 for INSERT
- 2 for DELETE
- 3 for UPDATE
- 4 for UPSERT
If
your pipeline has a CRUD-enabled origin that processes
changed data, the destination simply reads the operation
type from the sdc.operation.type
header
attribute that the origin generates. If your pipeline has
a non-CDC origin, you can use the Expression Evaluator
processor or a scripting processor to define the record
header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled
origins, see Processing Changed Data.
BigQuery Data Types
The Google BigQuery destination converts Data Collector data types into BigQuery data types before writing data to BigQuery tables.
When you configure the destination to compensate for data drift, you can also configure the destination to create all new columns as String. However, by default, the destination converts record data to the appropriate data type.
The destination converts the following Data Collector data types into these BigQuery data types:
Data Collector Data Type | BigQuery Data Type |
---|---|
Boolean | Boolean |
Byte | Bytes |
Byte Array | Bytes |
Character | String |
Date | Date |
Datetime | Datetime or Timestamp |
Decimal | Decimal |
Double | Float |
Float | Float |
Integer | Integer |
List |
Repeated mode column of the specified datatype. Supported for Avro staging files only. For more information, see Staging File Formats. |
List-Map |
Struct or Record Repeated mode column with subfields created from the key-value pair. Supported for Avro staging files only. For more information, see Staging File Formats. |
Long | Integer |
Map |
Struct or Record Repeated mode column with subfields created from the key-value pair. Supported for Avro staging files only. For more information, see Staging File Formats. |
Short | Integer |
String | String |
Time | Time |
Configuring a Google BigQuery Destination
Configure a Google BigQuery destination to load data to BigQuery tables. Before you use the destination in a pipeline, complete the prerequisite tasks.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Required Fields Fields that must include data for the record to be passed into the stage. Tip: You might include fields that the stage uses.Records that do not include all required fields are processed based on the error handling configured for the pipeline.
Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions. Records that do not meet all preconditions are processed based on the error handling configured for the stage.
On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the Google BigQuery tab, configure the following
properties:
Google BigQuery Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Project ID Google Cloud project ID to use.
Credentials Provider Provider for Google Cloud credentials: - Default credentials provider - Uses Google Cloud default credentials.
- Service account credentials file (JSON) - Uses credentials stored in a JSON service account credentials file.
- Service account credentials (JSON) - Uses JSON-formatted credentials information from a service account credentials file.
Credentials File Path (JSON) Path to the Google Cloud service account credentials file used to connect. The credentials file must be a JSON file. Enter a path relative to the Data Collector resources directory,
$SDC_RESOURCES
, or enter an absolute path.Credentials File Content (JSON) Contents of a Google Cloud service account credentials JSON file used to connect. Enter JSON-formatted credential information in plain text, or use an expression to call the information from runtime resources or a credential store.
Connection Pool Size Maximum number of connections that the destination uses to load data. Default is 0, which ensures that the destination uses the same number of connections as threads used by the pipeline. When loading to multiple tables, increasing this property can improve performance.
Enable Proxy Connects to the Google BigQuery API through an HTTP proxy server. Proxy URL URL of the HTTP proxy server in the following format: http://<host>:<port>
Proxy Username Optional user name to authenticate with the proxy server. Proxy Password Optional password to authenticate with the proxy server. -
On the Staging tab, configure the following
properties:
Staging Property Description Connection Connection that defines the information required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.
To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .
Staging Location Location to stage the pipeline data in CSV or Avro files. At this time, only Google Cloud Storage is supported as the staging location.
Purge Stage File After Ingesting Purges a staged file after its data is successfully loaded into Google BigQuery. Stage File Prefix Prefix to use for the name of the staged files. To create a unique name for each staged file, the destination appends a unique number to the prefix. For example, if you define the prefix as
streamsets
, the destination might name one of the staged filesstreamsets-69021a22-1474-4926-856f-eb4589d14cca
.Bucket Bucket to use when staging files. Enter a bucket name or define an expression that evaluates to bucket names.
Compress File with gzip Compresses files with gzip before writing to the staging area. -
On the Table Definition tab, configure the following
properties:
Table Definition Property Description Dataset BigQuery datasets to load data into. To load data into a single dataset, enter the dataset name. To load data into multiple datasets, enter an expression that evaluates to the field in the record that contains the dataset name. For example:
${record:value('/dataset')}
Table BigQuery tables to load data into. To load data into a single table, enter the table name. To load data into multiple tables, enter an expression that evaluates to the field in the record that contains the table name. For example:
${record:value('/table')}
Or, to load data into tables based on the table name in the
jdbc.tables
record header attribute generated by an origin that reads from a relational database, you can use the following expression:${record:attribute('jdbc.tables')}
The destination writes data from record fields to table columns based on matching names.
Enable Data Drift Creates new columns in existing tables when new fields appear in records. When not enabled, the destination treats records that have new fields as error records.
Create New Columns as String Creates all new columns as String. By default, the destination infers the data type based on the type of data in the field. For example, if a new field contains an integer, the destination creates a column with the Integer data type.
Available when data drift handling is enabled.
Create Table Automatically creates tables when needed. When not enabled, the destination treats a record attempting to load to a table that doesn't exist as an error record.
Available when data drift handling is enabled.
New Columns Mode Specifies the mode for new columns created in new tables: - Required - Null values are not allowed.
- Nullable - Column allows null values.
When the destination adds new fields to existing tables, columns are always added as Nullable.
Available when data drift handling and the creation of new tables are enabled.
Create Dataset Automatically creates datasets when needed. When not enabled, the destination treats a record attempting to load to a dataset that doesn't exist as an error record.
Available when data drift handling and the creation of new tables are enabled.
Partition Table Creates partitions in new tables based on the partition configurations. Available when data drift handling and the creation of new tables are enabled.
Partition Configuration One or more partition configurations that define how to partition new tables. For each partition configuration, you specify whether the configuration is the default or whether it should be applied to a specific dataset or table. Then you specify the partition type and the required options for that type.
Click Add Another to add an additional partition configuration.
Available when table partitioning is enabled.
-
On the Data tab, configure the following properties:
Data Property Description Schema Generator Schema generator to use: - Data Collector - The destination generates the schema. In most cases, you can use this default option.
- BigQuery - BigQuery generates the schema using its native schema auto-detection. For more information about schema auto-detection, see the BigQuery documentation.
Note: Enable Data Drift, Create Table, and Merge CDC Data can only be set when the schema is generated by Data Collector. BigQuery automatically creates tables when generating the schema.Staging File Format Format for staging files. Select one of the following options: - CSV
- JSON
- Avro
- Parquet
Avro Schema Location Location of the Avro schema. Select one of the following options: - Infer from records - The destination infers the schema from the records. This is the default option.
- In record header - The schema is in a record header attribute.
- In pipeline configuration - The schema is entered in the Avro Schema property.
Available when using the Avro staging file format.
Avro Schema Enter a valid Avro schema. Tip: You can use theruntime:loadResource()
function to use a schema stored in a file.Available when entering the Avro schema during pipeline configuration.
Nullable Fields Allows fields to include null values by creating a union of the field type and null type.
By default, fields cannot include null values.
Available when inferring the Avro schema from records.
Default to Nullable When allowing null values in schema fields, uses null as a default value for all fields. Available when inferring the Avro schema from records with Nullable Fields selected.
Default Values for Types Optionally specify default values for Avro data types. Click Add to configure a default value. The default value applies to all fields of the specified data type.
You can specify default values for the following Avro types:- Boolean
- Integer
- Long
- Float
- Double
- String
Available when inferring the Avro schema from records with Nullable Fields not selected.
Expand Types Allow using a larger Data Collector data type for an Avro data type when an exact equivalent is not available. Available when inferring the Avro schema from records.
Precision Field Attribute Name of the schema attribute that stores the precision for a decimal field. Available when inferring the Avro schema from records.
Scale Field Attribute Name of the schema attribute that stores the scale for a decimal field. Available when inferring the Avro schema from records.
Default Precision Default precision to use for decimal fields when the precision is not specified or is invalid. Use -1 to opt out of this option.
Note: When decimal fields do not have a valid precision and scale, the stage sends the record to error.Available when inferring the Avro schema from records
Default Scale Default scale to use for decimal fields when the precision is not specified or is invalid. Use -1 to opt out of this option.
Note: When decimal fields do not have a valid precision and scale, the stage sends the record to error.Available when inferring the Avro schema from records
Row Field Map or list-map field to use as the basis for the generated row. Default is /
, which includes all record fields in the resulting row.Column Fields to Ignore List of fields to ignore when writing to BigQuery. You can enter a comma-separated list of first level fields to ignore. Field names are case sensitive.
Null Value Characters to use to represent null values. Default is an empty string, which is the default null value in BigQuery.
Available when using the Delimited staging file format.
Merge CDC Data Enables performing CRUD operations and merging data into BigQuery tables. Select to process CDC data. Important: To maintain the original order of data, do not use multiple threads when processing CDC data.Key Columns Key columns used to evaluate the merge condition for each BigQuery table. Click the Add icon in the Key Column field to add additional key columns for a table.
Available when processing CDC data.
-
On the Data Advanced tab, configure the following
properties:
Data Advanced Property Description Ignore Missing Fields Allows writing records with missing fields to BigQuery tables. Uses the specified default value for the data type of the missing field. When not enabled, records with missing fields are treated as error records.
Ignore Fields with Invalid Types Allows replacing fields that contain data of an invalid type with the specified default value for the data type. When not enabled, records with data of invalid types are treated as error records.
Boolean Default Default value to use when replacing missing Boolean fields or Boolean fields with invalid data. Default is an empty string, which is the default null value in BigQuery.
Numeric Types Default Default value to use when replacing missing Numeric fields or Numeric fields with invalid data. Default is an empty string, which is the default null value in BigQuery.
Float Default Default value to use when replacing missing Float fields or Float fields with invalid data. Default is an empty string, which is the default null value in BigQuery.
Decimal Default Default value to use when replacing missing Decimal fields or Decimal fields with invalid data. Default is an empty string, which is the default null value in BigQuery.
Date Default Default value to use when replacing missing Date fields or Date fields with invalid data. Default is an empty string, which is the default null value in BigQuery.
Timestamp Default Default value to use when replacing missing Timestamp fields or Timestamp fields with invalid data. Default is an empty string, which is the default null value in BigQuery.
String Default Default value to use when replacing missing String fields or String fields with invalid data. Default is an empty string, which is the default null value in BigQuery.
Replace Newlines Replaces newline characters in string fields with the specified replacement character. Available when using the Delimited staging file format.
New Line Replacement Character Character to use to replace newline characters. Available when using the Delimited staging file format.
Trim Spaces Trims leading and trailing spaces from field data. Column Separator Character to use as a column separator. Quote Character Character to enclose field data.