PostgreSQL JDBC Table
The PostgreSQL JDBC Table origin reads data from one or more PostgreSQL tables. The origin can read all of the columns from the tables or only specified columns from the tables. Pipelines can contain only one origin configured to read from multiple tables. To read from one or more tables using a custom query, use the JDBC Query origin.
When you configure the PostgreSQL JDBC Table origin, you specify database connection information and any additional JDBC configuration properties you want to use.
You configure the tables to read, and optionally define the columns to read from the tables. You specify the offset column and the maximum number of partitions used to read from a database table. The data type of the offset column can limit the number of partitions that the origin can use. You can also configure an additional predicate for the query.
With the origin configured to read from multiple tables, the pipeline processes multiple batches. In batch mode, the pipeline sequentially processes one batch for each table that the origin is configured to read, and then stops. In streaming mode, the pipeline also sequentially processes one batch for each table, but then waits a specified amount of time before repeating the process, starting from the first table once again.
When you configure the origin to read from exactly one table, you can configure the origin to load data only once. With this configuration, the origin only reads data during the first batch of a pipeline run and caches that data for reuse throughout the pipeline run. When you do not configure the origin to load data only once, you can configure the origin to cache data. With this configuration, the origin caches the data from each batch to pass that data efficiently to multiple downstream stages as the pipeline processes the batch. You can also configure the origin to skip tracking offsets.
You can optionally configure advanced properties such as specifying the fetch size, custom offset queries, and the JDBC driver to bundle with the pipeline.
Pipelines with the PostgreSQL JDBC Table origin contain batch headers. In the header, the origin sets the
jdbc.table
attribute, which stores the name of the table that
the origin reads for the batch. When the origin reads from multiple tables, you can
use the attribute to determine the origin's data source for a batch.
StreamSets has tested this origin on PostgreSQL 9.6.2 with the PostgreSQL 42.2.5 driver.
PostgreSQL JDBC Driver
The PostgreSQL JDBC Table origin includes the PostgreSQL JDBC driver, so you do not need to install a driver before using the origin.
If you want to use a custom driver, you can install it as an external library for the JDBC stage library.
By default, Transformer bundles a JDBC driver into the launched Spark application so that the driver is available on each node in the cluster. If you prefer to manually install an appropriate JDBC driver on each Spark node, you can configure the stage to skip bundling the driver on the Advanced tab of the stage properties.
Offset Column
Unless you configure the origin to skip offset tracking, the PostgreSQL JDBC Table origin tracks an offset for each table that the origin reads.
By default, the origin uses the primary key column for each table as the offset column. However, if any table has a composite key or the data type in a primary key column is not a supported offset data type, you must specify the offset column.
As an alternative to the default, you can configure one offset column for the origin. You must specify a column that exists in each table that the origin reads. The offset column should contain unique, incremental values and should not contain null values. The offset column must also be a supported offset data type.
- Track processing
- The origin tracks processing using values in the offset column. When reading the last row for a batch, the origin saves the value from the offset column. In the subsequent batch, the origin starts reading from the following row.
- Create partitions
- When tracking offsets and creating partitions, the origin determines the data to be processed and then divides the data into partitions based on ranges of offset values.
Supported Offset Data Types
The supported data types for an offset column differ based on the number of partitions that you want the origin to use when reading the data.
Partitions | Supported Offset Data Type |
---|---|
One partition |
|
One or more partitions |
|
Null Offset Value Handling
By default, the PostgreSQL JDBC Table origin does not process records with null offset values. You can configure the origin to process those records by enabling the Partition for NULL property on the Advanced tab.
When you enable the Partition for NULL property, the origin queries the table for rows with null offset values, then groups the resulting records into a single partition. As a result, when the table includes null offset values, each batch of data contains a partition of records with null offset values.
Default Offset Queries
The PostgreSQL JDBC Table origin uses two offset queries to determine the offset values to use when querying the database. The default queries work for most cases. On the rare occasion when you want to produce different results, you can configure custom offset queries to override the default queries.
- Min/max offset query
- This query returns the minimum and maximum values in the offset column. The origin uses these values to read all existing data in a table.
- Max offset query
- This query returns the maximum offset in the offset column. The origin uses this value along with the last-saved offset to read new data that arrived since processing the last batch for a table.
Custom Offset Queries
Configure custom offset queries to override the default offset queries for the PostgreSQL JDBC Table origin.
- Custom Min/Max Query
-
Returns the minimum and maximum value to use as offsets when querying the database. Configure this query to override the default min/max query that determines the first set of records that the origin reads for each table.
- Custom Max Query
- Returns a maximum value to use as an offset when querying the database. Configure this query to override the default max query that the origin uses to read subsequent sets of records from each table.
Specify a custom max query along with the custom min/max query to define a range of data for the pipeline to process, such as data generated in 2019.
For example, say you want to process only the data with
offsets 1000 to 8000, inclusive. And you want the first batch to process one
thousand records. To do this, you configure the custom min/max query to return
1000
and 2000
. This sets the lower boundary of
the data that the origin reads and defines the number of records included in the
first batch. To set the upper boundary of the data that the origin reads, you set
the custom max query to 8000
.
In the first batch, the origin reads records with offsets between 1000 and 2000, inclusive. In the second batch, the origin reads any new records with offsets between 2001 and 8000, inclusive. Now, say the last record in the second batch has an offset value of 2500. Then, in the third batch, the origin reads any new records with offsets between 2501 and 8000, and so on.
Partitioning
Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel.
For the PostgreSQL JDBC Table origin, Spark determines the number of partitions from the maximum number of partitions and the table's partition column. Spark splits the data from the table into the partitions and creates one connection to the database for each partition.
Spark uses these partitions while the pipeline processes the batch unless a processor causes Spark to shuffle the data. To change the partitioning in the pipeline, use the Repartition processor.
- The size and configuration of the cluster.
- The amount of data being processed.
- The number of concurrent connections that can be made to the database.
If the pipeline fails because the origin encounters an out of memory error, you likely need to increase the number of partitions for the origin.
Partition Column Selection
By default, the PostgreSQL JDBC Table origin uses the offset column for the table as a partition column to improve read performance.
- When configured, the origin uses the field specified in the Offset Column property to create partitions.
- When no offset column is specified and the table includes a single-column primary key, the origin uses the primary key column for partitioning.
- When no offset column is specified and the table has a compound primary key instead of a single-column primary key, the origin uses the first key column for partitioning.
- When no offset column is specified and the table has no primary key columns, the origin uses the first datetime or numeric column that is indexed for partitioning.
When the origin cannot locate a partition column, it reads the entire table in a single partition.
PostgreSQL Data Types
The following table lists the PostgreSQL data types that the PostgreSQL JDBC Table origin supports and the Transformer data types they are converted to.
PostgreSQL data types not listed in the table are not supported.
PostgreSQL Data Type | Transformer Data Type |
---|---|
Bigint | Long |
Bit, Bit Varying, Bytea | Binary |
Boolean | Boolean |
Box, Circle, Interval, Line, Lseg, Path, Point, Polygon | String |
Char, Text, Varchar | String |
Cidr, Inet, Macadr | String |
Date | Date |
Daterange, Int4range, Int8range, Numrange, Tsrange, Tstzrange | String |
Decimal, Numeric | Decimal |
Double Precision, Money | Double |
Integer | Integer |
Json, Jsonb, Uuid, Xml | String |
Real | Float |
Smallint | Short |
Time, Time with Time Zone, Timestamp, Timestamp with Time Zone | Timestamp |
Configuring a PostgreSQL JDBC Table Origin
-
On the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. Load Data Only Once Reads data while processing the first batch of a pipeline run and caches the results for reuse throughout the pipeline run. Select this property for lookup origins. When configuring lookup origins, do not limit the batch size. All lookup data should be read in a single batch.
Do not select this property when you configure the origin to read from more than one table.
Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages. Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.
Available when Load Data Only Once is not enabled. When the origin loads data once, the origin caches data for the entire pipeline run.
Skip Offset Tracking Skips tracking offsets. The origin reads all available data for each batch that the pipeline processes, limited by any batch-size configuration for the origin.
-
On the Connection tab, configure the following
properties:
Connection Property Description JDBC Connection String Connection string used to connect to the database.
Use the connection string format required by the database:- PostgreSQL -
jdbc:postgresql://<host>:<port>/<dbname>
For example:
jdbc:postgresql://33333:2222/mydb
- Amazon Aurora PostgreSQL -
jdbc:postgresql:aws://<instance_name>.<cluster_id>.<aws_region>.rds.amazonaws.com:<port>/
For example:
jdbc:postgresql:aws://instance.1234abcd.us-west-1.rds.amazonaws.com:1234/
Note: If you include the JDBC credentials in the connection string, use a user account created for the origin. The user must have the required privileges for the database.Use Credentials Enables entering credentials. Use when you do not include credentials in the JDBC connection string.
Username PostgreSQL username. The specified user must have the required role for the database.
Password PostgreSQL password. Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value. You can use simple or bulk edit mode to configure the properties.
Use the property names and values as expected by JDBC.
- PostgreSQL -
-
On the Table tab, configure the following
properties:
Table Property Description Schema Name of the schema in which the tables are located. Define when the database requires a schema. Tables List of database tables to read. The list can contain exact table names and table-name patterns. Patterns can include the wildcards
%
or_
. The%
wildcard matches any substring of 0 or more characters. The_
wildcard matches any one character.For example, suppose a database has tables named MyTable, Table, Table1, and Table10, and you enter:Table, Table_, %Table
Then the origin reads the tables named MyTable, Table, and Table1, but not Table10.
The list must match at least one table.
Offset Column Table column used to track processed rows and to create partitions. The specified column has the following requirements:- Must be in all the tables that the origin reads
- Must be a supported offset data type
- Should contain unique, incremental values and should not contain null values
By default, the origin uses the primary key column as the offset column. Specify another column as needed.
Max Number of Partitions Maximum number of partitions to use to read a batch. Default value is 10. For information about partition columns when skipping offset tracking, see Partition Column Selection.
Columns to Read Columns to read from each table. If you specify no columns, the origin reads all the columns in each table. Specified columns must exist in all the tables that the origin reads.
Click the Add icon to specify an additional column. You can use simple or bulk edit mode to configure the columns.
Additional Predicate Additional predicate to include in the WHERE clause of the query. Use Spark SQL syntax. -
On the Advanced tab, configure the following
properties:
Advanced Property Description Specify Fetch Size Enables specifying a specific fetch size. Fetch Size Suggests the number of rows that the JDBC driver should fetch with each database round-trip. Use 0 to skip defining a fetch size.
For more information about configuring a fetch size, see the database documentation.
Partition for NULL Enables processing records with null offset values. Generates a partition in each batch that includes all records with null offset values. Note: Enabling this property is likely to result in the processing of duplicate records.Use Custom Min/Max Query Enables overriding the default min/max offset query that the origin uses to determine the first set of records to read from each table. Custom Min/Max Query Custom Spark SQL query that returns the minimum and maximum offset values to read, in this order. Use Custom Max Query Enables overriding the default max query that the origin uses to determine each subsequent set of records to read. Custom Max Query Custom Spark SQL query that returns the maximum offset values to read. JDBC Driver JDBC driver to include with the pipeline: - Bundle driver from connection string
- Transformer bundles a JDBC driver with the pipeline.
Select this option to use the PostgreSQL JDBC driver included with the origin. Bundling a driver ensures that pipeline can run on all Spark cluster nodes.
- Bundle custom driver - Transformer bundles the specified driver with the pipeline.
Select this option to use a third-party driver that you installed on Transformer as an external library. Bundling a driver ensures that the pipeline can run on all Spark cluster nodes.
- Do not bundle driver - Transformer does not include a driver with the pipeline.
Select this option in the rare case that each node of the Spark cluster includes a compatible JDBC driver for the pipeline to use.
Driver Class Name Class name of the custom JDBC driver to use. - Bundle driver from connection string
- Transformer bundles a JDBC driver with the pipeline.