JDBC Producer

Supported pipeline types:
  • Data Collector

The JDBC Producer destination uses a JDBC connection to write data to a database table. You can also use the JDBC Producer destination to write change capture data from a Microsoft SQL Server change log. For information about supported versions, see Supported Systems and Versions.
Data Collector provides database-specific destinations, such as SQL Server 2019 BDC Bulk Loader. When available, StreamSets recommends using a database-specific destination.
Important: This stage does not support connecting to non-RDBMS systems, including Hive, Impala, Kudu, or Snowflake. Support for untested systems is not guaranteed. For a list of tested systems, see "Database Vendors and Drivers".

When you configure JDBC Producer, you specify connection information, table name, and optionally define field mappings.

By default, JDBC Producer writes data to the table based on the matching field names. You can override the default field mappings by defining specific mappings. To determine which table rows to update or delete, the destination detects the list of primary key columns for the table, and then uses the fields mapped to those columns to match rows.

You can configure the stage to rollback an entire batch if an error occurs while writing part of the batch. You can also configure custom properties that your driver requires.

The JDBC Producer can use CRUD operations defined in the sdc.operation.type record header attribute to write data. You can define a default operation for records without the header attribute or value. You can also configure whether to use multi-row operations for inserts and deletes, and how to handle records with unsupported operations.

You can specify the format of the change data capture log used to process data from a CDC-enabled origin. For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.

To use a JDBC version older than 4.0, you can specify the driver class name and define a health check query.

You can use the JDBC Producer as part of the Drift Synchronization Solution for PostgreSQL.

Database Vendors and Drivers

The JDBC Producer destination can write data to multiple database vendors.

The following table lists the supported and tested database versions for this stage. You can use the stage with other JDBC-compliant databases, but full support is not guaranteed. For a full list of supported versions, see Supported Systems and Versions.
Database Vendor Supported Versions Tested Versions
MySQL MySQL 5.7 and later
  • MySQL 5.7 with the MySQL Connector/J 8.0.12 driver
  • MySQL 8.0 with the MySQL Connector/J 8.0.12 driver
Oracle
  • Oracle 11g Release 2, 12c, 18c, 19c, 21c
  • Oracle Real Application Clusters (RAC) 11g Release 2, 12c, 18c, 19c, 21c
Also supported:
  • Hosted systems, such as Amazon RDS, that run supported versions of Oracle RAC
  • Derived systems, such as Oracle Exadata, that run supported versions of Oracle RAC
  • Oracle 11g Release 2, 19c with the Oracle 21.8.0.0 JDBC driver version
PostgreSQL PostgreSQL 9.x and later
  • PostgreSQL 9.6.9
  • PostgreSQL 10.4
  • PostgreSQL 11.7
  • PostgreSQL 12.2
  • PostgreSQL 13.0
  • PostgreSQL 14.0
  • PostgreSQL 15.0
Microsoft SQL Server
  • SQL Server 2017
  • SQL Server 2019
  • SQL Server 2017
  • SQL Server 2019

Installing the JDBC Driver

Before you use the JDBC Producer destination, install the JDBC driver for the database. You cannot access the database until you install the required driver.
Note: When connecting to a PostgreSQL or Microsoft SQL Server database, you do not need to install a JDBC driver. Data Collector includes the JDBC driver required for those databases.

You install the driver into the JDBC stage library, streamsets-datacollector-jdbc-lib, which includes the destination.

To use the JDBC driver with multiple stage libraries, install the driver into each stage library associated with the stages. For example, if you want to use a MySQL JDBC driver with the JDBC Lookup processor and with the MySQL Binary Log origin, you install the driver as an external library for the JDBC stage library, streamsets-datacollector-jdbc-lib, and for the MySQL Binary Log stage library, streamsets-datacollector-mysql-binlog-lib.

For information about installing additional drivers, see Install External Libraries.

CRUD Operation Processing

The JDBC Producer destination can insert, update, or delete data. The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.

The destination uses the header attribute and stage properties as follows:
CRUD operation header attribute
The destination looks for the CRUD operation in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
If your pipeline has a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline has a non-CDC origin, you can use the Expression Evaluator processor or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
If there is no CRUD operation in the sdc.operation.type record header attribute, the destination uses the operation configured in the Default Operation property.
If the sdc.operation.type record header attribute contains an unsupported value, the destination takes the action configured in the Unsupported Operation Handling property. The destination can discard the record, send the record for error handling, or write the record using the default operation.

Update and Delete Operations

For update and delete operations, the JDBC Producer destination automatically detects the primary key of the table and uses that key in the WHERE clause that updates or deletes rows. The destination supports compound primary keys, keys that consist of more than one column.

For example, in the following database table, named customer, the id column is the primary key:
id first middle last
1 john f smith
2 john m doe
3 mary jane smith
Suppose the sdc.operation.type record header attribute for the following record is set to 2, to delete the record from the table:
{
 "id": 1,
 "first": "john",
 "middle": "m",
 "last": "doe"
}
Then, the destination matches the row with the same primary key and creates the following query:
DELETE FROM customer WHERE id = 1

Note that the destination matches the row based on the primary key and not the other fields in the record.

Single and Multi-row Operations

JDBC Producer performs single-row operations by default. That is, it executes a SQL statement for each record. When supported by the destination database, you can configure JDBC Producer to perform multi-row operations. Depending on the sequence of the data, multi-row operations can improve pipeline performance.

When performing multi-row operations, JDBC Producer creates a single SQL statement for sequential insert rows and for sequential delete rows. JDBC Producer does not perform multi-row update operations. You can configure the Statement Parameter Limit property to limit the number of parameters in an insert operation - that is, you can limit the number of records included in an insert statement.

For example, say the pipeline generates three insert records, followed by two update records, and four delete records. If you enable multi-row operations and do not set a statement parameter limit, JDBC Producer generates a single insert SQL statement for the three insert records, two update statements - one for each of the update records, and a single delete statement for the four delete records. On the other hand, if you enable multi-row operations and set the statement parameter limit to two, JDBC Producer generates two insert SQL statements - one for two insert records and one for the third insert record, two update statements - one for each of the update records, and a single delete statement for the four delete records.

Important: Before enabling multi-row operations, verify that the database supports the SQL statements that JDBC Producer uses.

Error handling for multi-row operations depends on the database. If the database reports the individual record that causes an error in a multi-row statement, the stage sends that record to the error stream. If the database does not report which record causes an error, the stage sends all the records from the statement to the error stream.

For multi-row inserts, JDBC Producer uses the following SQL statement:
INSERT INTO <table name> (<col1>, <col2>, <col3>) 
     VALUES (<record1 field1>,<record1 field2>,<record1 field3>), 
     (<r2 f1>,<r2 f2>,<r2 f3>), (<r3 f1>,<r3 f2>,<r3 f3>),...;
For multi-row deletes, JDBC Producer uses the following SQL statement for tables with a single primary key:
DELETE FROM <table name> WHERE <primary key> IN (<key1>, <key2>, <key3>,...);
For multi-row deletes, JDBC Producer uses the following SQL statement for tables with multiple primary keys:
DELETE FROM <table name> WHERE (<pkey1>, <pkey2>, <pkey3>)
      IN ((<key1-1>, <key1-2>, <key1-3>),(<key2-1>, <key2-2>, <key2-2>),...);

Configuring a JDBC Producer

Configure the JDBC Producer to use JDBC to write data to a database table.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    JDBC Connection String Connection string used to connect to the database. Use the connection string format required by the database vendor.

    For example, use the following formats for these database vendors:

    • MySQL - jdbc:mysql://<host>:<port>/<database_name>
    • Oracle - jdbc:oracle:<driver_type>:@<host>:<port>:<service_name>
    • PostgreSQL - jdbc:postgresql://<host>:<port>/<database_name>
    • SQL Server - jdbc:sqlserver://<host>:<port>;databaseName=<database_name>

    You can optionally include the user name and password in the connection string.

    Schema Name Optional database or schema name to use.
    Use when the database requires a fully-qualified table name.
    Tip: Oracle uses all caps for schema, table, and column names by default. Names can be lower- or mixed-case only if the schema, table, or column was created with quotation marks around the name.

    To use a lower- or mixed-case schema name, enter the name and enable the Enclosed Object Names property.

    Table Name Database table name to use. Use the table name format required by the database.
    Tip: Oracle uses all caps for schema, table, and column names by default. Names can be lower- or mixed-case only if the schema, table, or column was created with quotation marks around the name.

    To use a lower- or mixed-case table name, enter the name and enable the Enclosed Object Names property.

    Use Credentials Enables entering credentials on the Credentials tab. Select when you do not include credentials in the JDBC connection string.
    Field to Column Mapping Use to override the default field to column mappings. By default, fields are written to columns of the same name.
    When you override the mappings, you can define parameterized values to apply SQL functions to the field values before writing them to columns. For example, to convert a field value to an integer, enter the following for the parameterized value:
    CAST(? AS INTEGER)

    The question mark (?) is substituted with the value of the field. Leave the default value of ? if you do not need to apply a SQL function.

    Using simple or bulk edit mode, click the Add icon to create additional field to column mappings.

    Enclose Object Names Encloses the database or schema name, table name, and column names in quotation marks when writing to the database.

    Enables using case-sensitive names or names with special characters. When not enabled, the JDBC driver that the destination uses determines how the names are submitted.

    Oracle JDBC drivers submit names as all caps by default. Also, Oracle uses all caps for schema, table, and column names by default. Names can be lower- or mixed-case only if the schema, table, or column was created with quotation marks around the name.

    Change Log Format Format of change capture data log produced by the CDC-enabled origin. Used to process change capture data.
    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.
    Use Multi-Row Operation Combines sequential insert operations and sequential delete operations into single statements. Select to insert and delete multiple records at the same time. Before enabling this option, verify that the database supports the multi-row SQL statements used by the stage.

    By default, the stage performs single-row operations.

    Statement Parameter Limit Number of parameters allowed in the prepared statement for multi-row inserts.

    Use -1 to disable the parameter limit. Default is -1.

    Rollback Batch On Error Rolls back the entire batch when an error occurs within the batch.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  3. If you configured the origin to enter JDBC credentials separately from the JDBC connection string on the JDBC tab, then configure the following properties on the Credentials tab:
    Credentials Property Description
    Username User name for the JDBC connection.

    The user account must have the correct permissions or privileges in the database.

    Password Password for the JDBC user name.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
  4. When using JDBC versions older than 4.0, on the Legacy Drivers tab, optionally configure the following properties:
    Legacy Drivers Property Description
    JDBC Class Driver Name Class name for the JDBC driver. Required for JDBC versions older than version 4.0.
    Connection Health Test Query Optional query to test the health of a connection. Recommended only when the JDBC version is older than 4.0.
  5. On the Advanced tab, optionally configure advanced properties.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout (seconds) Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout (seconds) Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, Data Collector ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime (seconds) Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.

    The query is performed after each connection to the database. If the stage disconnects from the database during the pipeline run, for example if a network timeout occurrs, the stage performs the query again when it reconnects to the database.

    For example, in case of Oracle, the following query returns 1 to verify that the stage is connected to the database: Select 1 from dual;

    Data SQLSTATE Codes List of SQLSTATE codes to treat as data errors. The destination applies error record handling to records that trigger a listed code.

    When a record triggers a SQLSTATE code not listed, the destination generates a stage error that stops the pipeline.

    To add a code, click Add and enter the code.