JDBC Table

The JDBC Table origin reads data from a database table. Use the JDBC Table origin to process data from a database that is not natively supported.

Transformer provides database-specific origins, such as Google Big Query, Oracle JDBC Table, and Snowflake. When possible, StreamSets recommends using available database-specific origins. To read from one or more tables using a custom query, use the JDBC Query origin.

The origin can read all the columns from a table or only specified columns from a table. In each batch, the origin reads a specified number of rows, distributing those rows uniformly across the specified partitions. When reading the last row, the origin saves the value from a specified offset column. In the subsequent batch, the origin uses the offset to locate the last row read and starts reading from the next row.

When you configure the JDBC Table origin, you specify the database connection information and any additional JDBC configuration properties you want to use. You configure the table to read and optionally specify the columns to read from the table. You can specify an additional predicate to include any conditions that you would place in a WHERE clause.

You can also use a connection to configure the origin.

You define the offset column, the maximum number of rows to include in each batch, and the number of partitions used to read from the database table. You can optionally configure advanced properties related to the JDBC driver.

You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets.

Before using the JDBC Table origin, verify if you need to install a JDBC driver.

Database Vendors and Drivers

The JDBC Table origin can read database data from multiple database vendors.

StreamSets has tested this origin with the following database vendors, versions, and JDBC drivers:
Database Vendor Versions and Drivers
Microsoft SQL Server SQL Server 2017 with the SQL Server JDBC 8.4.0 JRE8 driver
MySQL MySQL 5.7 with the MySQL Connector/J 8.0.12 driver
Oracle Oracle 11g with the Oracle 19.3.0 JDBC driver
PostgreSQL PostgreSQL 9.6.2 with the PostgreSQL 42.2.5 driver

Installing the JDBC Driver

The JDBC Table origin includes drivers for the following databases:
  • Microsoft SQL Server
  • PostgreSQL

When using the stage to connect to any other database, you must install the JDBC driver for the database. Install the driver as an external library for the JDBC stage library.

By default, Transformer bundles a JDBC driver into the launched Spark application so that the driver is available on each node in the cluster. If you prefer to manually install an appropriate JDBC driver on each Spark node, you can configure the stage to skip bundling the driver on the Advanced tab of the stage properties.

If you install a JDBC driver provided by one of the following vendors, the stage automatically detects the JDBC driver class name from the configured JDBC connection string:
  • Apache Derby
  • IBM DB2
  • Microsoft SQL Server
  • MySQL
  • Oracle
  • PostgreSQL
  • Teradata

If you install a custom JDBC driver or a driver provided by another vendor, you must specify the JDBC driver class name on the Advanced tab of the stage.

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. When the pipeline starts processing a new batch, Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline.

For the JDBC Table origin, Spark determines the partitioning based on the number of partitions that you configure for the origin. Spark creates one connection to the database for each partition.

Spark uses these partitions while the pipeline processes the batch unless a processor causes Spark to shuffle the data. To change the partitioning in the pipeline, use the Repartition processor.

Consider the following when you configure the properties that determine the number of partitions for the origin:
  • The size and configuration of the cluster.
  • The amount of data being processed.
  • The number of concurrent connections that can be made to the database.

If the pipeline fails because the origin encounters an out of memory error, you likely need to increase the number of partitions for the origin.

Offset Column Requirement

The JDBC Table origin uses a single offset column. The offset column should contain unique, incremental values and should not contain null values. supported data type.

When a table includes a single primary key column, the origin uses it as the offset column, by default.

You can configure the origin to use a different offset column. You might specify an alternate offset column when the table uses a composite key or when the data type of the primary key column is not supported.

The JDBC Table origin uses the offset column to perform two tasks:
Track processing
The origin tracks processing using values in the offset column. When reading the last row for a batch, the origin saves the value from the offset column. In the subsequent batch, the origin starts reading from the following row.
For example, if the first batch in a pipeline includes offsets 1-250 based on an integer offset column, the origin creates the next batch starting from offset 251. Similarly, say a batch pipeline stops after processing offset 250. The next time the pipeline runs, the origin begins processing with offset 251, unless you reset the offset.
Create partitions
When creating partitions, the origin determines the data to be processed and then divides the data into partitions based on ranges of offset values.

For example, say you have rows with integer offsets from 1 to 1000 and you configure the origin to create two partitions. The first partition might include records with offsets from 1-500, and the second partition, the offsets from 501-1000.

Supported Offset Data Types

The offset column for the JDBC Table origin must be one of the following data types:
  • Bigint
  • Decimal or Numeric with a scale of 0
  • Integer
  • Smallint

Configuring a JDBC Table Origin

Configure a JDBC Table origin to read data from a database table. Before using the origin, verify if you need to install a JDBC driver.

  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Load Data Only Once Reads data while processing the first batch of a pipeline run and caches the results for reuse throughout the pipeline run.

    Select this property for lookup origins. When configuring lookup origins, do not limit the batch size. All lookup data should be read in a single batch.

    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

    Available when Load Data Only Once is not enabled. When the origin loads data once, the origin caches data for the entire pipeline run.

    Skip Offset Tracking Skips tracking offsets.

    The origin reads all available data for each batch that the pipeline processes, limited by any batch-size configuration for the origin.

  2. On the Connection tab, configure the following properties:
    Connection Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    JDBC Connection String Connection string used to connect to the database. Use the connection string format required by the database vendor.

    For example, use the following formats for these database vendors:

    • MySQL - jdbc:mysql://<host>:<port>/<database_name>
    • Oracle - jdbc:oracle:<driver_type>:@<host>:<port>:<service_name>
    • PostgreSQL - jdbc:postgresql://<host>:<port>/<database_name>
    • SQL Server - jdbc:sqlserver://<host>:<port>;databaseName=<database_name>
    • SQL Server 2019 Big Data Cluster accessed from a Transformer pipeline - jdbc:sqlserver://<ip>:<port>;databaseName=<database_name>

      Where <ip> is the IP address of the SQL Server master instance, and <port> is the port number of the SQL Server master instance. Use the SQL Server user name and password with this connection string. For more information, see SQL Server 2019 JDBC Connection Information.

      You can optionally include the user name and password in the connection string.

    Use Credentials Enables entering credentials. Select when you do not include credentials in the JDBC connection string.
    Username User name for the JDBC connection.

    The user account must have the correct permissions or privileges in the database.

    Password Password for the JDBC user name.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use.

    To add properties, click Add and define the JDBC property name and value. You can use simple or bulk edit mode to configure the properties.

    Use the property names and values as expected by JDBC.

  3. On the Table tab, configure the following properties:
    Table Property Description
    Schema Name of the schema in which the tables are located. Define when the database requires a schema.
    Table Database table to read.
    Offset Column Table column used to create partitions and track processed rows. The offset column must be of a supported type.

    The offset column should contain unique, incremental values and should not contain null values. The origin does not process records with null offset values.

    By default, the origin uses the primary key column as the offset column. Specify another column as needed.

    Max Rows per Batch Maximum number of table rows included in a batch.

    Use -1 to read all rows in a single batch.

    Number of Partitions Number of partitions used when reading a batch.

    Default value is 10.

    Columns to Read Columns to read from each table. If you specify no columns, the origin reads all the columns in each table.

    Specified columns must exist in all the tables that the origin reads.

    Click the Add icon to specify an additional column. You can use simple or bulk edit mode to configure the columns.

    Additional Predicate Optional predicate to limit the query that the origin performs. You can specify any condition valid for the database that you would place into a WHERE clause of a Spark SQL query.

    Do not include WHERE in the predicate.

    For example, to query records with IDs greater than 1000, enter ID > 1000.

  4. On the Advanced tab, optionally configure advanced properties.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Min Partition Size Minimum number of records to include in a partition. Use to limit the creation of small partitions.

    -1 allows partitions of any size.

    Default is 10,000 records.

    Specify Fetch Size Enables specifying a specific fetch size.
    Fetch Size Suggests the number of rows that the JDBC driver should fetch with each database round-trip.

    Use 0 to skip defining a fetch size.

    For more information about configuring a fetch size, see the database documentation.

    Use Custom Min/Max Query Use a custom minimum and maximum SQL query instead of the query generated by the origin.
    Custom Min/Max Query Custom minimum and maximum SQL query to run. The query must return exactly one row of two Long columns.
    By default, the origin generates and runs the following query to retrieve the smallest and largest values from the offset column:
    SELECT MIN(<offset column>), MAX(<offset column>) FROM <table>

    If your database contains separate tables with indexes such as the daily or weekly maximum and minimum values for the offset column, you can define a custom minimum and maximum query.

    JDBC Driver JDBC driver to include with the pipeline:
    • Bundle driver from connection string - Transformer bundles a JDBC driver with the pipeline.

      Select this option to use a driver installed on Transformer. The stage detects the driver class name from the configured JDBC connection string. Bundling a driver ensures that the pipeline can run on all Spark cluster nodes.

    • Bundle custom driver - Transformer bundles the specified driver with the pipeline.

      Select this option to use a third-party driver that you installed on Transformer as an external library. Bundling a driver ensures that the pipeline can run on all Spark cluster nodes.

    • Do not bundle driver - Transformer does not include a driver with the pipeline.

      Select this option in the rare case that each node of the Spark cluster includes a compatible JDBC driver for the pipeline to use.

    Default is to bundle the driver from the JDBC connection string.

    Driver Class Name Class name of the custom JDBC driver to use.