JDBC Lookup

Supported pipeline types:
  • Data Collector

The JDBC Lookup processor uses a JDBC connection to perform lookups in a database table and pass the lookup values to fields. For information about supported versions, see Supported Systems and Versions.

Use the JDBC Lookup to enrich records with additional data. For example, you can configure the processor to use a department_ID field as the column to look up department name values in a database table, and pass the values to a new department_name output field.

Important: This stage does not support connecting to non-RDBMS systems, including Hive, Impala, Kudu, or Snowflake. Support for untested systems is not guaranteed. For a list of tested systems, see "Database Vendors and Drivers". Use the Kudu Lookup processor to look up Kudu data.

When a lookup results in multiple matches, the JDBC Lookup processor can return the first matching value, all matching values in a list in a single record, or all matching values in separate records.

When you configure JDBC Lookup, you specify connection information and custom JDBC configuration properties to determine how the processor connects to the database. You configure the SQL query to define the data to look up in the database, specify the output fields to write the lookup values to, and choose the multiple match behavior.

You can specify the behavior for when the lookup returns no values and optionally configure a default value for the same situation. You can also specify the behavior for when the processor encounters data of an unexpected type.

You can configure the processor to locally cache the lookup values to improve performance. When caching lookup values, you can also enable using additional threads to prepopulate the lookup cache and further increase performance.

By default, the JDBC Lookup processor requires all table columns defined in the column mappings to exist when the pipeline starts. You can configure the processor to skip this validation.

The processor generates JDBC field attributes that provide additional information about each field.

To use a JDBC version older than 4.0, you can specify the driver class name and define a health check query.

When you monitor a pipeline that includes the JDBC Lookup processor, you can view stage statistics about the number of queries the processor makes and the average time of the queries.

Database Vendors and Drivers

The JDBC Lookup processor can perform lookups of database data from multiple database vendors.

The following table lists the supported and tested database versions for this stage. You can use the stage with other JDBC-compliant databases, but full support is not guaranteed. For a full list of supported versions, see Supported Systems and Versions.
Database Vendor Supported Versions Tested Versions
MySQL MySQL 5.7 and later
  • MySQL 5.7 with the MySQL Connector/J 8.0.12 driver
  • MySQL 8.0 with the MySQL Connector/J 8.0.12 driver
Oracle
  • Oracle 11g Release 2, 12c, 18c, 19c, 21c
  • Oracle Real Application Clusters (RAC) 11g Release 2, 12c, 18c, 19c, 21c
Also supported:
  • Hosted systems, such as Amazon RDS, that run supported versions of Oracle RAC
  • Derived systems, such as Oracle Exadata, that run supported versions of Oracle RAC
  • Oracle 11g Release 2, 19c with the Oracle 21.8.0.0 JDBC driver version
PostgreSQL PostgreSQL 9.x and later
  • PostgreSQL 9.6.9
  • PostgreSQL 10.4
  • PostgreSQL 11.7
  • PostgreSQL 12.2
  • PostgreSQL 13.0
  • PostgreSQL 14.0
  • PostgreSQL 15.0
Microsoft SQL Server
  • SQL Server 2017
  • SQL Server 2019
  • SQL Server 2017
  • SQL Server 2019

MySQL Data Types

The JDBC Lookup processor converts MySQL data types into Data Collector data types.

The processor supports the following MySQL data types:
MySQL Data Type Data Collector Data Type
Bigint Long
Bigint Unsigned Decimal
Binary Byte Array
Blob Byte Array
Char String
Date Date
Datetime Datetime
Decimal Decimal
Double Double
Enum String
Float Float
Int Integer
Int Unsigned Long
Json String
Linestring Byte Array
Medium Int Integer
Medium Int Unsigned Long
Numeric Decimal
Point Byte Array
Polygon Byte Array
Set String
Smallint Short
Smallint Unsigned Integer
Text String
Time Time
Timestamp Datetime
Tinyint, Tinyint Unsigned Short
Varbinary Byte Array
Varchar String
Year Date

Oracle Data Types

The JDBC Lookup processor converts Oracle data types into Data Collector data types.

The processor supports the following Oracle data types:
Oracle Data Type Data Collector Data Type
Number Decimal
Char String
Varchar, Varchar2 String
Nchar, NvarChar2 String
Binary_float Float
Binary_double Double
Date Datetime
Timestamp Datetime
Timestamp with time zone Zoned_datetime
Timestamp with local time zone Zoned_datetime
Long String
Blob Byte_array
Clob String
Nclob String
XMLType String

PostgreSQL Data Types

The JDBC Lookup processor converts PostgreSQL data types into Data Collector data types.

The processor supports the following PostgreSQL data types:
PostgreSQL Data Type Data Collector Data Type
Bigint Long
Boolean Boolean
Bytea Byte Array
Char String
Date Date
Decimal Decimal
Double Precision Double
Enum String
Integer Integer
Money Double
Numeric Decimal
Real Float
Smallint Short
Text String
Time, Time with Time Zone Time
Timestamp, Timestamp with Time Zone Time
Varchar String

SQL Server Data Types

The JDBC Lookup processor converts SQL Server data types into Data Collector data types.

The processor supports the following SQL Server data types:
SQL Server Data Type Data Collector Data Type
Bigint Long
Binary Byte_Array
Bit Boolean
Char String
Date Date
Datetime, Datetime2 Datetime
Datetimeoffset Zoned_datetime
Decimal Decimal
Float Double
Image Byte_Array
Int Integer
Money Decimal
Nchar String
Ntext String
Numeric Decimal
Nvarchar String
Real Float
Smalldatetime Datetime
Smallint Short
Smallmoney Decimal
Text String
Time Time
Tinyint Short
Varbinary Byte_Array
Varchar String
XML String

Unsupported Data Types

The stage handles unsupported data types in one of the following ways:
Stops the pipeline
If the stage encounters an unsupported data type, the stage stops the pipeline after completing the processing of the previous records and displays the following error:
JDBC_37 - Unsupported type 1111 for column.
By default, the stage stops the pipeline.
Converts to string
If the stage encounters an unsupported data type, the stage converts the data to string when possible, and then continues processing. Not all unsupported data types can successfully be converted to string. When using this option, verify that the data is converted to string as expected.
To configure the stage to attempt to convert unsupported data types to string, on the Advanced tab, set the On Unknown Type property to Convert to String.

Installing the JDBC Driver

Before you use the JDBC Lookup processor, install the JDBC driver for the database. You cannot access the database until you install the required driver.
Note: When connecting to a PostgreSQL, Microsoft SQL Server, or MariaDB database, you do not need to install a JDBC driver. Data Collector includes the JDBC driver required for those databases.

You install the driver into the JDBC stage library, streamsets-datacollector-jdbc-lib, which includes the processor.

To use the JDBC driver with multiple stage libraries, install the driver into each stage library associated with the stages. For example, if you want to use a MySQL JDBC driver with the JDBC Lookup processor and with the MySQL Binary Log origin, you install the driver as an external library for the JDBC stage library, streamsets-datacollector-jdbc-lib, and for the MySQL Binary Log stage library, streamsets-datacollector-mysql-binlog-lib.

For information about installing additional drivers, see Install External Libraries.

Lookup Cache

To improve pipeline performance, you can configure the JDBC Lookup processor to locally cache the values returned from a database table.

The processor caches values until the cache reaches the maximum size or the expiration time. When the first limit is reached, the processor evicts values from the cache.

You can configure the following ways to evict values from the cache:
Size-based eviction
Configure the maximum number of values that the processor caches. When the maximum number is reached, the processor evicts the oldest values from the cache.
Time-based eviction
Configure the amount of time that a value can remain in the cache without being written to or accessed. When the expiration time is reached, the processor evicts the value from the cache. The eviction policy determines whether the processor measures the expiration time since the last write of the value or since the last access of the value.
For example, you set the eviction policy to expire after the last access and set the expiration time to 60 seconds. After the processor does not access a value for 60 seconds, the processor evicts the value from the cache.

When you stop the pipeline, the processor clears the cache.

Using Additional Threads

When using local caching, you can increase the number of threads that the JDBC Lookup processor uses to prepopulate the lookup cache. After the cache is populated, the additional threads are released. This can substantially increase the performance of the processor.

By default, the Minimum Idle Connections property, on the Advanced tab, determines the minimum number of connections to the database that Data Collector creates and maintains.

When you enable using a local lookup cache, the Minimum Idle Connections property can also determine the number of available cores on the Data Collector machine that the processor uses for threads to prepopulate the cache.

When caching is enabled, the JDBC Lookup processor uses additional threads based on the smaller of the following numbers:
  • The configured Minimum Idle Connections property.
  • The number of available cores on the Data Collector machine, minus one. The processor never uses all available cores.

By increasing the setting for the Minimum Idle Connections property, you can enable the processor to use almost all available cores on the Data Collector machine for additional threads to prepopulate the lookup cache.

For example, say you have 8 available cores on the Data Collector machine when you start a pipeline, and the JDBC Lookup has local caching enabled and the Minimum Idle Connections property set to 8. Then, the JDBC Lookup can use 7 of the available cores for threads to prepopulate the lookup cache. This might be ideal if you have complex lookup processing that you want to accomplish as quickly as possible and do not need to reserve resources for other processing.

To reserve machine resources for other processing, you can limit the cores that are used by the processor by setting the Minimum Idle Connections property to a lower number. For example, if you set Minimum Idle Connections to 5, then the processor can use up to 4 available cores for threads.

Retry Lookups for Missing Values

When you enable local caching, the processor also caches the configured default value when a lookup for a given column fails and a default value is defined for that column in the Column Mappings. The processor then always returns the default value for the column to avoid unnecessary lookups.

You can configure the processor to retry lookups for known missing values by enabling the Retry on Missing Value property. Configure the processor to retry lookups when the lookup table might be updated as the pipeline runs.

For example, if you expect that new values will be inserted in the table as the pipeline runs, you’d want to configure the processor to retry the request rather than returning the cached default value.

Note: If a lookup for a given column fails and a default value is not configured for that column, then the processor handles the record based on the Missing Values Behavior property.

JDBC Field Attributes

The JDBC Lookup processor generates field attributes for columns converted to the Decimal or Datetime data types in Data Collector. The attributes provide additional information about each field.

The following data type conversions do not include all information in the corresponding Data Collector data type:
  • Decimal and Numeric data types are converted to the Data Collector Decimal data type, which does not store scale and precision.
  • The Timestamp data type is converted to the Data Collector Datetime data type, which does not store nanoseconds.
To preserve this information during data type conversion, the origin generates the following field attributes for these Data Collector data types:
Data Collector Data Type Generated Field Attribute Description
Decimal precision Provides the original precision for every decimal or numeric column.
Decimal scale Provides the original scale for every decimal or numeric column.
Datetime nanoSeconds Provides the original nanoseconds for every timestamp column.

You can use the record:fieldAttribute or record:fieldAttributeOrDefault functions to access the information in the attributes. For more information about working with field attributes, see Field Attributes.

Monitoring a JDBC Lookup

When you monitor a pipeline that includes the JDBC Lookup processor, the Summary tab displays statistics about the queries that the JDBC Lookup processor performs. Use the statistics to help identify any performance bottlenecks encountered by the pipeline.

When you select the JDBC Lookup processor in the canvas while monitoring a running pipeline, the Summary tab displays the following stage statistics:

The Select Queries Meter displays the number of queries that the processor makes per second. The Select Queries Timer displays the average amount of time that the queries take to complete.

For more information about monitoring pipelines, see Pipeline Monitoring Overview.

Configuring a JDBC Lookup Processor

Configure a JDBC Lookup processor to perform lookups in a database table.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    JDBC Connection String Connection string used to connect to the database. Use the connection string format required by the database vendor.

    For example, use the following formats for these database vendors:

    • MySQL - jdbc:mysql://<host>:<port>/<database_name>
    • Oracle - jdbc:oracle:<driver_type>:@<host>:<port>:<service_name>
    • PostgreSQL - jdbc:postgresql://<host>:<port>/<database_name>
    • SQL Server - jdbc:sqlserver://<host>:<port>;databaseName=<database_name>

    You can optionally include the user name and password in the connection string.

    For Azure Managed Identity, use the JDBC connection string provided in your Azure database connection string settings.

    Note: If you are using Java 8, you must disable Java Security Manager for Data Collector to use Azure Managed Identity for the stage.
    SQL Query SQL query to use to look up data in the database. Use the following syntax for the query:
    SELECT <column1 name>, <column2 name> FROM <table name> WHERE <column3 name> =
        '${record:value(<field path>)}'
    For example, to use the department ID field to look up the department name column, use the following query:
    SELECT DeptName FROM Departments WHERE DeptID = '${record:value('/dept_ID')}'
    Column Mappings Use to override the default column to field mappings. By default, columns are written to fields of the same name.
    Enter the following:
    • Column Name - Name of the database column that contains the lookup value. Enter a column name or enter an expression that defines the column name.
    • SDC Field - Name of the field in the record that receives the lookup value. You can specify an existing field or a new field. If the field does not exist, JDBC Lookup creates the field.
    • Default Value - Optional default value to use when the query does not return a value for the field. If the query returns no value and this property is not defined, the processor handles the record based on the Missing Values Behavior property.

      To enter a default value for the Date data type, use the following format: yyyy/MM/dd. To enter a default value for the Datetime data type, use the following format: yyyy/MM/dd HH:mm:ss.

    • Data Type - Data type to use for the SDC Field. Required when you specify a default value. The processor uses the database column data type by default.

    Using simple or bulk edit mode, click the Add icon to create additional column mappings.

    Multiple Values Behavior Action to take upon finding multiple matching values:
    • First value only - Returns the first value.
    • All values as a list - Returns every matching value in a list in a single record.
    • Split into multiple records - Returns every matching value in a separate record.
    Missing Values Behavior Action to take upon finding no return values in fields with no default value defined:
    • Send to error - Sends the record to error.
    • Pass the record along the pipeline unchanged - Passes the record without a lookup return value.
    Max Clob Size (characters) Maximum number of characters to be read in a Clob field. Larger data is truncated.
    Max Blob Size (bytes) Maximum number of bytes to be read in a Blob field.
    Retry on Missing Value Specifies whether to retry lookups for known missing values. By default, the processor caches and then always returns the default value for known missing values to avoid unnecessary lookups.
    Use Credentials Enables entering credentials on the Credentials tab. Select when you do not include credentials in the JDBC connection string.
    Validate Column Mappings Requires that all column names defined in the column mappings exist in the database when the pipeline starts.

    When enabled, if a column in the mapping does not exist, the pipeline fails to start. When disabled, if a column in the mapping does not exist, the mapping is not used during processing.

    Note: When the query includes multiple tables and this property is enabled, the processor validates the column names used in column mappings for each table to be processed before processing data. This can slow performance.

    By default, this property is enabled.

    Validate Query Requires that all queries not using stored procedures contain SELECT, WHERE, and FROM clauses.

    When enabled, the pipeline generates an error when it encounters a query that does not contain all the required clauses.

    By default, this propery is enabled.

    Enable Local Caching Specifies whether to locally cache the returned values.
    Maximum Entries to Cache Maximum number of values to cache. When the maximum number is reached, the processor evicts the oldest values from the cache.

    Default is -1, which means unlimited.

    Eviction Policy Type Policy used to evict values from the local cache when the expiration time has passed:
    • Expire After Last Access - Measures the expiration time since the value was last accessed by a read or a write.
    • Expire After Last Write - Measures the expiration time since the value was created, or since the value was last replaced.
    Expiration Time Amount of time that a value can remain in the local cache without being accessed or written to.

    Default is 1 second.

    Time Unit Unit of time for the expiration time.

    Default is seconds.

    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  3. If you configured the origin to enter JDBC credentials separately from the JDBC connection string on the JDBC tab, then configure the following properties on the Credentials tab:
    Credentials Property Description
    Username User name for the JDBC connection.

    The user account must have the correct permissions or privileges in the database.

    Password Password for the JDBC user name.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
  4. When using JDBC versions older than 4.0, on the Legacy Drivers tab, optionally configure the following properties:
    Legacy Drivers Property Description
    JDBC Class Driver Name Class name for the JDBC driver. Required for JDBC versions older than version 4.0.
    Connection Health Test Query Optional query to test the health of a connection. Recommended only when the JDBC version is older than 4.0.
  5. On the Advanced tab, optionally configure advanced properties.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections When local caching is not enabled, determines the minimum number of connections to the database to create and maintain.

    When local caching is enabled, also determines the number of available cores that can be used for additional threads for processing.

    Note: When local caching is enabled, configure this property carefully to avoid monopolizing Data Collector resources. For more information, see Using Additional Threads.
    Connection Timeout (seconds) Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout (seconds) Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, Data Collector ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime (seconds) Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Auto Commit Determines if auto-commit mode is enabled. In auto-commit mode, the database commits the data for each record.

    Default is disabled.

    Enforce Read-only Connection Creates read-only connections to avoid any type of write.

    Default is enabled. Disabling this property is not recommended.

    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.

    The query is performed after each connection to the database. If the stage disconnects from the database during the pipeline run, for example if a network timeout occurrs, the stage performs the query again when it reconnects to the database.

    For example, in case of Oracle, the following query returns 1 to verify that the stage is connected to the database: Select 1 from dual;

    On Unknown Type Action to take when encountering an unsupported data type:
    • Stop Pipeline - Stops the pipeline after completing the processing of the previous records.
    • Convert to String - When possible, converts the data to string and continues processing.