CONNX CDC

The CONNX CDC origin reads mainframe change data provided by the CONNX data synchronization tool, DataSync. To read mainframe data from a CONNX server using a SQL query, use the CONNX origin. Connecting to CONNX requires a Mainframe Collector license. The license allows reading mainframe data from CONNX and writing the data to cloud destinations. Contact your StreamSets account manager for more information. For information about supported versions, see Supported Systems and Versions.

The CONNX CDC origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

When you configure the CONNX CDC origin, you specify connection information and any custom JDBC configuration properties to determine how the origin connects to CONNX. You can also use a connection to configure the origin.

You specify the DataSync transformation that defines the change data to process. When the source database has high-precision timestamps, such as IBM Db2 TIMESTAMP(9) fields, you can configure the origin to write strings rather than datetime values to maintain the precision.

You can specify custom properties that your driver requires and configure connection properties. And you can specify what the origin does when encountering an unsupported data type. By default, the origin generates JDBC record header and field attributes that provide additional information about each record and field.

The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Prerequisites

Connecting to CONNX requires a Mainframe Collector license. The license allows reading mainframe data from CONNX and writing the data to cloud destinations. Contact your StreamSets account manager for more information.

Before you build a CONNX pipeline, you must install the CONNX SQL engine local to the mainframe machine. For information about installing a CONNX SQL engine, see the CONNX documentation.

DataSync Transformation

The CONNX CDC origin processes change data provided by CONNX DataSync. To process change data, you must create a DataSync transformation for change data, and configure the transformation to manage changes for the data that you want to process.

For more information about creating a DataSync change data capture synchronization transformation, see the CONNX documentation.

Supported Destinations

The destinations that you can use in a CONNX pipeline depend on your Mainframe Collector license. For more information, see Prerequisites.

The following destinations can be included in CONNX pipelines:
  • Amazon S3
  • Azure Data Lake Storage Gen2
  • Azure Synapse
  • Databricks Delta Lake
  • JDBC Producer to write to Amazon Redshift
  • Google BigQuery
  • Google Cloud Storage
  • Snowflake

Processing CONNX CDC Data

The CONNX CDC origin works with a CONNX DataSync transformation to process CONNX change data. At a high level, DataSync uses snapshots, finalize instructions, and savepoints to perform change data capture synchronization.

DataSync stores snapshots of the state of specified tables. The finalize instruction indicates the last snapshot that was processed. The savepoint indicates the snapshot up to which you want data. So when a savepoint is created, DataSync reviews the finalize and savepoint snapshots and provides the data needed to get from the finalize snapshot to the savepoint snapshot, populating insert, update, and delete tables with the required updates.

When you start a pipeline, the CONNX CDC origin generates a DataSync savepoint. DataSync then populates the insert, update, and delete tables with the data required to get from the last finalize point to the savepoint. The CONNX CDC origin processes the data and passes it to downstream pipeline stages. After all data in the insert, update, and delete tables is processed, the CONNX CDC origin generates finalize instructions for DataSync, which creates a finalize snapshot that indicates the data that was processed.

Recovery Handling

When a CONNX CDC pipeline comes to an unexpected stop, you might need to review and remove some of the generated data from the pipeline destinations to avoid duplicate or inaccurate data.

Under normal conditions, when a CONNX CDC pipeline stops, the origin generates finalize instructions to indicate where processing ended. If the pipeline stops unexpectedly, such as a sudden shutdown of the Data Collector machine, the origin is unable to complete processing as expected or to generate the appropriate finalize instructions.

When the pipeline restarts, the origin creates a new savepoint, and DataSync reviews the finalize and savepoint snapshots, as usual, and provides the required data to get from the finalize snapshot to the savepoint snapshot. However, some of that data was already processed before the pipeline stopped unexpectedly.

As a result, after an unexpected pipeline stop, best practice is to check Data Collector logs for the timestamp of the last generated finalize instructions. Then review the data written to destination systems after that timestamp, and remove it if needed, before starting the pipeline again.

Record Attributes

The CONNX CDC origin includes the sdc.operation.type record header attribute in generated records that specifies the CRUD operation associated with each record. The origin can also add JDBC record header attributes and field attributes to generated records.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.

CRUD Operation Header Attributes

When generating records, the CONNX CDC Client origin specifies the operation type in the sdc.operation.type record header attribute. The origin uses the following values in the attribute to represent the operation type:
  • 1 for Insert
  • 2 for Delete
  • 3 for Update
  • 4 for Upsert
  • 5 for unsupported operations

If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Elasticsearch, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator processor or any scripting processor to manipulate the value in the header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

JDBC Header Attributes

By default, the CONNX CDC origin generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record. The origin receives these details from the JDBC driver.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.

JDBC header attributes include a user-defined prefix to differentiate the JDBC header attributes from other record header attributes. By default, the prefix is jdbc.

You can change the prefix that the origin uses and you can configure the origin not to create JDBC header attributes with the Create JDBC Header Attributes and JDBC Header Prefix properties on the Advanced tab.

The origin can provide the following JDBC header attributes:
JDBC Header Attribute Description
<JDBC prefix>.tables
Provides a comma-separated list of source tables for the fields in the record.
Note: Not all JDBC drivers provide this information.
<JDBC prefix>.<column name>.jdbcType Provides the numeric value of the original SQL data type for each field in the record. See the Java documentation for a list of the data types that correspond to numeric values.
<JDBC prefix>.<column name>.precision Provides the original precision for all numeric and decimal fields.
<JDBC prefix>.<column name>.scale Provides the original scale for all numeric and decimal fields.

JDBC Field Attributes

The CONNX CDC origin generates field attributes for columns converted to the Decimal or Datetime data types in Data Collector. The attributes provide additional information about each field.

The following data type conversions do not include all information in the corresponding Data Collector data type:
  • Decimal and Numeric data types are converted to the Data Collector Decimal data type, which does not store scale and precision.
  • The Timestamp data type is converted to the Data Collector Datetime data type, which does not store nanoseconds.
To preserve this information during data type conversion, the origin generates the following field attributes for these Data Collector data types:
Data Collector Data Type Generated Field Attribute Description
Decimal precision Provides the original precision for every decimal or numeric column.
Decimal scale Provides the original scale for every decimal or numeric column.
Datetime nanoSeconds Provides the original nanoseconds for every timestamp column.

You can use the record:fieldAttribute or record:fieldAttributeOrDefault functions to access the information in the attributes. For more information about working with field attributes, see Field Attributes.

Header Attributes with the Drift Synchronization Solution

When you use the CONNX CDC origin with the Drift Synchronization Solution, ensure that the origin creates JDBC header attributes. JDBC header attributes allow the Hive Metadata processor to use the precision and scale information in the attributes to define decimal fields. The origin creates JDBC header attributes, by default.

To enable the Hive Metadata processor to define decimal fields as needed, perform the following steps:
  1. In the origin, on the Advanced tab, make sure that the Create JDBC Header Attributes property is selected.
  2. On the same tab, you can optionally change the default for the JDBC Header Prefix property.
  3. If you changed the default value for the JDBC Header Prefix property, then on the Hive tab of the Hive Metadata processor, configure the Decimal Precision Expression and Decimal Scale Expression properties. Update the jdbc. string in each property to match the specified JDBC header prefix.

    If you did not change the JDBC Header Prefix default value, then use the default expressions for the properties.

Event Generation

The CONNX CDC origin can generate events that you can use in an event stream. When you enable event generation, the origin generates events when it completes processing queries, such as insert, delete, or updates queries, and when a query fails to complete. The origin also generates events when creating savepoints and finalize instructions. For more information about savepoints and finalize instructions, see Processing CONNX CDC Data.

CONNX CDC events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to run in incremental mode, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. In contrast, if you configure the origin to run in full mode, when you restart the pipeline, the origin uses the initial offset, if specified.

    For an example, see Stopping a Pipeline After Processing All Available Data.

  • With the Email executor to send a custom email after receiving an event.

    For an example, see Sending Email During Pipeline Processing.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Records

Event records generated by the CONNX CDC origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses one of the following types:
  • connx-delete-success - Generated when the origin successfully completes a query for delete data.
  • connx-insert-success - Generated when the origin successfully completes a query for insert data.
  • connx-update-success - Generated when the origin successfully completes a query for update data.
  • connx-finalize-success - Generated when the origin successfully generates DataSync finalize instructions to indicate that it has processed the data provided for the most recent savepoint.
  • connx-savepoint-success - Generated when the origin successfully generates a DataSync savepoint to indicate the snapshot to be replicated.
  • jdbc-query-failure - Generated when the origin fails to complete a query.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The origin can generate the following types of event records:
connx-delete-success
Generated when the origin completes processing the data returned from a delete query.
These event records have the sdc.event.type record header attribute set to connx-delete-success and include the following fields:
Field Description
CONNXCDCDeleteEvent.QUERY Delete query that completed successfully.
CONNXCDCDeleteEvent.TIMESTAMP Timestamp when the query completed.
CONNXCDCDeleteEvent.ROW_COUNT Number of processed rows.
connx-insert-success
Generated when the origin completes processing the data returned from an insert query.
These event records have the sdc.event.type record header attribute set to connx-insert-success and include the following fields:
Field Description
CONNXCDCInsertEvent.QUERY Insert query that completed successfully.
CONNXCDCInsertEvent.TIMESTAMP Timestamp when the query completed.
CONNXCDCInsertEvent.ROW_COUNT Number of processed rows.
connx-update-success
Generated when the origin completes processing the data returned from an update query.
These event records have the sdc.event.type record header attribute set to connx-update-success and include the following fields:
Field Description
CONNXCDCUpdateEvent.QUERY Update query that completed successfully.
CONNXCDCUpdateEvent.TIMESTAMP Timestamp when the query completed.
CONNXCDCUpdateEvent.ROW_COUNT Number of processed rows.
connx-finalize-success
Generated when the origin creates finalize instructions to indicate that it processed all of the data generated by DataSync to reach the latest savepoint.
These event records have the sdc.event.type record header attribute set to connx-finalize-success and include the following fields:
Field Description
CONNXFinalizeEvent.QUERY Query that completed successfully.
CONNXFinalizeEvent.TIMESTAMP Timestamp when the query completed.
connx-savepoint-success
Generated after the origin successfully creates a savepoint.
These records have the sdc.event.type record header attribute set to connx-savepoint-success and include the following fields:
Field Description
CONNXSavePointEvent.QUERY Savepoint query that completed successfully.
CONNXSavePointEvent.TIMESTAMP Timestamp when the savepoint was created.
jdbc-query-failure
Generated when the origin fails to complete processing the data returned from a query.
These event records have the sdc.event.type record header attribute set to jdbc-query-failure and include the following fields:
Field Description
query Query that failed to complete.
timestamp Timestamp when the query failed to complete.
row-count Number of records from the query that were processed.
source-offset Origin offset after query failure.

Not used by this origin.

error First error message.

CONNX SQL Data Types

The CONNX CDC origin converts CONNX SQL data types into Data Collector data types. The origin supports the following data types:
CONNX SQL Data Type Data Collector Data Type
BigInt Long
Binary Byte Array
Bit Boolean
Char, Nchar String
Date Date
Decimal Decimal
Double Double
Float, QFloat Float
Integer Int
Numeric Decimal
Real Real
Smallint, Tinyint Short
Time Time
Timestamp Datetime
Varbinary, Longvarbinary Byte Array
Varchar, Longvarchar String
Varnchar, Longnvarchar String

Unsupported Data Types

The stage handles unsupported data types in one of the following ways:
Stops the pipeline
If the stage encounters an unsupported data type, the stage stops the pipeline after completing the processing of the previous records and displays the following error:
JDBC_37 - Unsupported type 1111 for column.
By default, the stage stops the pipeline.
Converts to string
If the stage encounters an unsupported data type, the stage converts the data to string when possible, and then continues processing. Not all unsupported data types can successfully be converted to string. When using this option, verify that the data is converted to string as expected.
To configure the stage to attempt to convert unsupported data types to string, on the Advanced tab, set the On Unknown Type property to Convert to String.

Configuring a CONNX CDC Origin

Configure a CONNX CDC origin to process mainframe change data provided by a CONNX DataSync transformation. Before using this origin in a pipeline, perform the prerequisite tasks.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    Connection Connection that defines the information required to connect to an external system.

    To connect to an external system, you can select a connection that contains the details, or you can directly enter the details in the pipeline. When you select a connection, Control Hub hides other properties so that you cannot directly enter connection details in the pipeline.

    To create a new connection, click the Add New Connection icon: . To view and edit the details of the selected connection, click the Edit Connection icon: .

    Use Connection String Enables specifying a connection string instead of defining connection details.
    CONNX JDBC Connection String JDBC connection string. Use the following format:

    jdbc:connx:DD=<dsn>;Gateway=<gateway>;Port=<port>

    The CONNX JDBC driver uses port 7500 by default.

    Gateway CONNX JDBC server gateway.
    Port Port number to use.

    Default is 7500.

    Database Name Name of the database to read from.
    Use SSL Uses SSL/TLS to connect to CONNX.

    Be sure to perform the required tasks to enable SSL. For more information, see SSL/TLS in CONNX Stages.

    DataSync Transform Name of the CONNX DataSync transformation.
    Use Credentials Enables entering credentials on the Credentials tab. Select when you do not include credentials in the JDBC connection string.
    Root Field Type Root field type to use for generated records. Use the default List-Map option unless using the origin in a pipeline built with Data Collector version 1.1.0 or earlier.
    Max Batch Size (records) Maximum number of records to include in a batch.
    Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Default is 10 seconds: ${10 * SECONDS}.

    Max Clob Size (characters) Maximum number of characters to be read in a Clob field. Larger data is truncated.
    Max Blob Size (bytes)

    Maximum number of bytes to be read in a Blob field.

    Number of Retries on SQL Error Maximum number of times the origin tries to execute the query after encountering a SQL error. After retrying this number of times, the origin handles the error based on the error handling configured for the origin.

    Use to handle transient network or connection issues that prevent the origin from submitting a query.

    Default is 0.

    Convert Timestamp To String Enables the origin to write timestamps as string values rather than datetime values. Strings maintain the precision stored in the source system. For example, strings can maintain the precision of a high-precision IBM Db2 TIMESTAMP(9) field.

    When writing timestamps to Data Collector date or time data types that do not store nanoseconds, the origin stores any nanoseconds from the timestamp in a field attribute.

    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  3. If you configured the origin to enter JDBC credentials separately from the JDBC connection string on the JDBC tab, then configure the following properties on the Credentials tab:
    Credentials Property Description
    Username User name for the JDBC connection.

    The user account must have the correct permissions or privileges in the database.

    Password Password for the JDBC user name.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
  4. On the Advanced tab, optionally configure advanced properties.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout (seconds) Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout (seconds) Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, Data Collector ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime (seconds) Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Auto Commit Determines if auto-commit mode is enabled. In auto-commit mode, the database commits the data for each record.

    Default is disabled.

    Enforce Read-only Connection Creates read-only connections to avoid any type of write.

    Default is enabled. Disabling this property is not recommended.

    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.

    The query is performed after each connection to the database. If the stage disconnects from the database during the pipeline run, for example if a network timeout occurrs, the stage performs the query again when it reconnects to the database.

    Create JDBC Header Attributes Adds JDBC header attributes to records. The origin creates JDBC header attributes by default.
    Note: When using the origin with a Drift Synchronization Solution, make sure this property is selected.
    JDBC Header Prefix Prefix for JDBC header attributes.
    On Unknown Type Action to take when encountering an unsupported data type:
    • Stop Pipeline - Stops the pipeline after completing the processing of the previous records.
    • Convert to String - When possible, converts the data to string and continues processing.