JDBC Tee
The JDBC Tee processor uses a JDBC connection to write data to a MySQL or PostgreSQL database table, and then pass generated database column values to fields. For information about supported versions, see Supported Systems and Versions.
Use the JDBC Tee processor to write some or all record fields to a database table and then enrich records with additional data.
To use the processor, the database table must be configured to generate column values as data is inserted.
When you configure the JDBC Tee processor, you specify connection information to the MySQL or PostgreSQL database, table name, and optionally define field mappings. By default, the processor writes data to the table based on the matching field names. You can override the default field mappings by defining specific mappings.
You define generated column mappings to specify the output fields to pass the generated database column values to.
You can configure the stage to rollback an entire batch if an error occurs while writing part of the batch. You can also configure custom properties that your driver requires.
The JDBC Tee processor can use CRUD operations defined in the
sdc.operation.type
record header attribute to write
data. You can define a default operation for records without the header
attribute or value. You can also configure whether to use multi-row
operations for inserts and deletes, and how to handle records with
unsupported operations.
You can specify the format of the change data capture log used to process data from a CDC-enabled origin. For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.
To use a JDBC version older than 4.0, you can specify the driver class name and define a health check query.
Example
Let's assume that you are processing customer orders. You have a customer database
table with an ID
column as the primary key. The customer table is
configured to generate a sequential number for the ID
column as
each row is inserted into the table. For example, the first customer row is assigned
an ID
of 001, and the second is assigned an ID
of
002.
When you process a new customer’s order, the JDBC Tee processor inserts the customer
data to the customer table and the database returns the generated ID for that
customer. The JDBC Tee processor passes the generated ID value to a new
cust_ID
field in the record. The processor passes all record
fields to the next stage in the pipeline for additional processing.
The following image displays a high-level overview of how the stage processes our customer order example:
Database Vendors and Drivers
The JDBC Tee processor can write data to a MySQL or PostgreSQL database.
Database Vendor | Supported Version | Tested Version |
---|---|---|
MySQL | MySQL 5.7 and later |
|
PostgreSQL | PostgreSQL 9.x and later |
|
MySQL Data Types
The JDBC Tee processor converts MySQL data types into Data Collector data types.
MySQL Data Type | Data Collector Data Type |
---|---|
Bigint | Long |
Bigint Unsigned | Decimal |
Binary | Byte Array |
Blob | Byte Array |
Char | String |
Date | Date |
Datetime | Datetime |
Decimal | Decimal |
Double | Double |
Enum | String |
Float | Float |
Int | Integer |
Int Unsigned | Long |
Json | String |
Linestring | Byte Array |
Medium Int | Integer |
Medium Int Unsigned | Long |
Numeric | Decimal |
Point | Byte Array |
Polygon | Byte Array |
Set | String |
Smallint | Short |
Smallint Unsigned | Integer |
Text | String |
Time | Time |
Timestamp | Datetime |
Tinyint, Tinyint Unsigned | Short |
Varbinary | Byte Array |
Varchar | String |
Year | Date |
PostgreSQL Data Types
The JDBC Tee processor converts PostgreSQL data types into Data Collector data types.
PostgreSQL Data Type | Data Collector Data Type |
---|---|
Bigint | Long |
Boolean | Boolean |
Bytea | Byte Array |
Char | String |
Date | Date |
Decimal | Decimal |
Double Precision | Double |
Enum | String |
Integer | Integer |
Money | Double |
Numeric | Decimal |
Real | Float |
Smallint | Short |
Text | String |
Time, Time with Time Zone | Time |
Timestamp, Timestamp with Time Zone | Time |
Varchar | String |
Installing the JDBC Driver
You install the driver into the JDBC stage library, streamsets-datacollector-jdbc-lib
, which includes the processor.
To use the JDBC driver
with multiple stage libraries, install the driver into each stage library associated
with the stages.
For example, if you want to use a MySQL JDBC driver with the JDBC Lookup processor
and with the MySQL Binary Log origin, you install the driver as an external library
for the JDBC stage library, streamsets-datacollector-jdbc-lib
, and for the MySQL Binary Log stage library, streamsets-datacollector-mysql-binlog-lib
.
For information about installing additional drivers, see Install External Libraries.
CRUD Operation Processing
The JDBC Tee processor can insert, update, or delete data. The processor writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.
- CRUD operation header attribute
- The processor looks for the CRUD operation to use in
the
sdc.operation.type
record header attribute. - Operation stage properties
- If there is no CRUD operation in the
sdc.operation.type
record header attribute, the processor uses the operation configured in the Default Operation property.
Single and Multi-row Operations
The JDBC Tee processor performs single-row operations by default. That is, it executes a SQL statement for each record. When supported by the destination database, you can configure the JDBC Tee processor to perform multi-row operations. Depending on the sequence of the data, multi-row operations can improve pipeline performance.
When performing multi-row operations, the JDBC Tee processor creates a single SQL statement for sequential insert rows and for sequential delete rows. The processor does not perform multi-row update operations. You can configure the Statement Parameter Limit property to limit the number of parameters in an insert operation - that is, you can limit the number of records included in an insert statement.
For example, say the pipeline generates three insert records, followed by two update records, and four delete records. If you enable multi-row operations and do not set a statement parameter limit, the JDBC Tee processor generates a single insert SQL statement for the three insert records, two update statements - one for each of the update records, and a single delete statement for the four delete records. On the other hand, if you enable multi-row operations and set the statement parameter limit to two, the JDBC Tee processor generates two insert SQL statements - one for two insert records and one for the third insert record, two update statements - one for each of the update records, and a single delete statement for the four delete records.
Error handling for multi-row operations depends on the database. If the database reports the individual record that causes an error in a multi-row statement, the stage sends that record to the error stream. If the database does not report which record causes an error, the stage sends all the records from the statement to the error stream.
INSERT INTO <table name> (<col1>, <col2>, <col3>)
VALUES (<record1 field1>,<record1 field2>,<record1 field3>),
(<r2 f1>,<r2 f2>,<r2 f3>), (<r3 f1>,<r3 f2>,<r3 f3>),...;
DELETE FROM <table name> WHERE <primary key> IN (<key1>, <key2>, <key3>,...);
DELETE FROM <table name> WHERE (<pkey1>, <pkey2>, <pkey3>)
IN ((<key1-1>, <key1-2>, <key1-3>),(<key2-1>, <key2-2>, <key2-2>),...);
Configuring a JDBC Tee Processor
Configure a JDBC Tee processor to write data to a MySQL or PostgreSQL database table and enrich records with data from generated database columns.
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Required Fields Fields that must include data for the record to be passed into the stage. Tip: You might include fields that the stage uses.Records that do not include all required fields are processed based on the error handling configured for the pipeline.
Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions. Records that do not meet all preconditions are processed based on the error handling configured for the stage.
On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Stop Pipeline - Stops the pipeline.
-
On the JDBC tab, configure the following properties:
JDBC Property Description JDBC Connection String Connection string used to connect to the database. Use the connection string format required by the database vendor. For example, use the following formats for these database vendors:
- MySQL -
jdbc:mysql://<host>:<port>/<database_name>
- PostgreSQL -
jdbc:postgresql://<host>:<port>/<database_name>
You can optionally include the user name and password in the connection string.
For Azure Managed Identity, use the JDBC connection string provided in your Azure database connection string settings.
Table Name Database table to write to. Some databases require a fully-qualified table name, such as <schema | database>:<tablename>
. Use the table name format required by the database.Enter one of the following:- Name of an existing database table.
- Expression that evaluates to the name of an
existing database table. For example, if the table
name is stored in the "tableName" record
attribute, enter the following
expression:
${record:attribute('tableName')}
Field to Column Mapping Use to override the default field to column mappings. By default, fields are written to columns of the same name. When you override the mappings, you can define parameterized values to apply SQL functions to the field values before writing them to columns. For example, to convert a field value to an integer, enter the following for the parameterized value:CAST(? AS INTEGER)
The question mark (?) is substituted with the value of the field. Leave the default value of ? if you do not need to apply a SQL function.
Using simple or bulk edit mode, click the Add icon to create additional field to column mappings.
Generated Column Mappings Map the generated database columns to fields in the record. Enter the following: - Column Name. Name of the database column that contains the generated value. Enter a column name or enter an expression that defines the column.
- SDC Field. Name of the field in the record that receives the generated column value. You can specify an existing field or a new field. If the field does not exist, JDBC Tee creates the field.
Using simple or bulk edit mode, click the Add icon to create additional generated column mappings.
Change Log Format Format of change capture data log produced by the CDC-enabled origin. Used to process change capture data. Default Operation Default CRUD operation to perform if the sdc.operation.type
record header attribute is not set.Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type
record header attribute is not supported:- Discard - Discards the record.
- Send to Error - Sends the record to the pipeline for error handling.
- Use Default Operation - Writes the record to the destination system using the default operation.
Use Multi-Row Operation Combines sequential insert operations and sequential delete operations into single statements. Select to insert and delete multiple records at the same time. Before enabling this option, verify that the database supports the multi-row SQL statements used by the stage. By default, the stage performs single-row operations.
Statement Parameter Limit Number of parameters allowed in the prepared statement for multi-row inserts. Use -1 to disable the parameter limit. Default is -1.
Rollback Batch On Error Rolls back the entire batch when an error occurs within the batch. Use Credentials Enables entering credentials on the Credentials tab. Select when you do not include credentials in the JDBC connection string. Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value. Use the property names and values as expected by JDBC.
- MySQL -
-
If you configured the origin to enter JDBC credentials
separately from the JDBC connection string on the JDBC tab,
then configure the following properties on the Credentials
tab:
Credentials Property Description Username User name for the JDBC connection. The user account must have the correct permissions or privileges in the database.
Password Password for the JDBC user name. Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores. -
When using JDBC versions older than 4.0, on the Legacy
Drivers tab, optionally configure the following
properties:
Legacy Drivers Property Description JDBC Class Driver Name Class name for the JDBC driver. Required for JDBC versions older than version 4.0. Connection Health Test Query Optional query to test the health of a connection. Recommended only when the JDBC version is older than 4.0. -
On the Advanced tab, optionally configure advanced
properties.
The defaults for these properties should work in most cases:
Advanced Property Description Maximum Pool Size Maximum number of connections to create. Default is 1. The recommended value is 1.
Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size. Default is 1.
Connection Timeout (seconds) Maximum time to wait for a connection. Use a time constant in an expression to define the time increment. Default is 30 seconds, defined as follows:${30 * SECONDS}
Idle Timeout (seconds) Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment. Use 0 to avoid removing any idle connections.
When the entered value is close to or more than the maximum lifetime for a connection, Data Collector ignores the idle timeout.
Default is 10 minutes, defined as follows:${10 * MINUTES}
Max Connection Lifetime (seconds) Maximum lifetime for a connection. Use a time constant in an expression to define the time increment. Use 0 to set no maximum lifetime.
When a maximum lifetime is set, the minimum valid value is 30 minutes.
Default is 30 minutes, defined as follows:${30 * MINUTES}
Transaction Isolation Transaction isolation level used to connect to the database. Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:
- Read committed
- Read uncommitted
- Repeatable read
- Serializable
Init Query SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed. The query is performed after each connection to the database. If the stage disconnects from the database during the pipeline run, for example if a network timeout occurrs, the stage performs the query again when it reconnects to the database.
For example, in case of Oracle, the following query returns 1 to verify that the stage is connected to the database:
Select 1 from dual;