The JDBC Multitable Consumer origin uses an offset column and initial offset value to
determine where to start reading data within tables and partitions.
By default, the origin uses the
primary key of the tables as the offset column and uses no initial offset value. When
you use multithreaded table processing and the table has a composite primary key, the
origin uses each primary key as an offset column. You cannot use composite keys with
multithreaded partition processing.
By default, the origin reads all available data from each table when you start the
pipeline. The origin generates SQL queries using the following syntax when you start the
pipeline:
SELECT * FROM <table> ORDER BY <offset column_1>, <offset column_2>, ...
Where <offset column_n>
represents each primary key of the
table, such as when the table has a composite primary key. When you restart the pipeline
or when the origin switches back to a previously read table, the origin adds a WHERE
clause to the SQL query to continue reading from the last saved offset.
To use this default behavior, you do not need to configure any of the offset
properties.
You can make the following changes to how the origin handles offset columns and initial
offset values:
- Override the primary key as the offset column
- You can override the primary key and define another offset column or
columns. Or if the table doesn’t have a primary key, you can define the
offset column or columns to use.
-
Important: As a best practice, a user-defined offset column
should be an incremental and unique column that does not contain null
values. If the column is not unique - that is, multiple rows can have
the same value for this column - there is a potential for data loss upon
pipeline restart. For details, see
Multiple Offset Value Handling.
- Having an index on this column is strongly encouraged since the underlying
query uses an ORDER BY and inequality operators on this column.
- Define an initial offset value
- The initial offset value is a value within the offset column where you want
the JDBC Multitable Consumer origin to start reading. When you define an
initial offset value, you must first enter the offset column name and then
the value. If you are using the default primary key as the offset column,
enter the name of the primary key.
- If you define an initial offset value for a single offset column, the origin
generates SQL queries using the following
syntax:
SELECT * FROM <table> ORDER BY <offset column> WHERE <offset column> > ${offset}
- If you defined multiple offset columns, you must define an initial offset
value for each column, in the same order that the columns are defined. The
origin uses the initial offset values of all columns to determine where to
start reading data. For example, you override the primary key with the
following offset columns:
p1
, p2
,
p3
and define an initial offset value for each column.
The origin generates SQL queries using the following syntax:SELECT * FROM <table> ORDER BY p1, p2, p3 WHERE (p1 > ${offset1}) OR (p1 = ${offset1} AND p2 > ${offset2}) OR (p1 = ${offset1} AND p2 = ${offset2} AND p3 > ${offset3})
Note: Data Collector stores offsets for Datetime columns as Long values. For offset
columns with a Datetime data type, enter the initial value as a Long
value. You can use the time functions to transform a Datetime value
to a Long value. For example, the following expression converts a
date entered as a String to a Date object, and then to a Long value:
${time:dateTimeToMilliseconds(time:extractDateFromString('2017-05-01 20:15:30.915','yyyy-MM-dd HH:mm:ss.SSS'))}
- Define additional offset column conditions
- You can use the expression language to define additional conditions that the
origin uses to determine where to start reading data. The origin adds the
defined condition to the WHERE clause of the SQL query.
- You can use the
offset:column
function in the condition to
access an offset column by position. For example, if you have a table with
offset columns p1
and p2
, then
offset:column(0)
returns the value of
p1
while offset:column(1)
returns the
value of p2
.
- Let's say that you defined a
transaction_time
column as the
offset column. While the origin reads the table, multiple active
transactions are being written to the table with the current timestamp for
the transaction_time
column. When the origin finishes
reading the first record with the current timestamp, the origin continues
reading with the next offset and skips some rows with the current timestamp.
You can enter the following offset column condition to ensure that the
origin reads from all offset columns with a timestamp less than the current
time:${offset:column(0)} < ${time:now()}
- If your database requires the datetime in a specific format, you can use the
time:extractStringFromDate
function to specify the
format. For example:
${offset:column(0)} < '${time:extractStringFromDate(time:now(), "yyyy-MM-dd HH:mm:ss")}'