Processing Changed Data

Certain stages enable you to easily process data changes, such as change capture data (CDC) or transactional data, in a pipeline.

CDC-enabled origins can read change capture data. Some exclusively read change capture data, others can be configured to read it. When reading changed data, they determine the CRUD operation associated with the data and include CRUD operations - such as insert, update, upsert, or delete - in the sdc.operation.type record header attribute.

CRUD-enabled processors and destinations can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.

Using a CDC-enabled origin and CRUD-enabled stages in a pipeline allows you to easily write changed data from one system into another. You can also use a CDC-enabled origin to write to non-CRUD destinations, and non-CDC origins to write to CRUD-enabled stages. For information on how that works, see Use Cases.

CRUD Operation Header Attribute

CDC-enabled origins read include the sdc.operation.type record header attribute in all records when reading changed data.

CRUD-enabled processors and destinations can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.

The sdc.operation.type record header attribute uses the following integers to represent CRUD operations:
  • 1 for INSERT records
  • 2 for DELETE records
  • 3 for UPDATE records
  • 4 for UPSERT records
  • 5 for unsupported operations or codes
  • 6 for UNDELETE records
  • 7 for REPLACE records
  • 8 for MERGE records
Note: Some origins use only a subset of the operations, based on the operations supported by the origin system. Similarly, destinations recognize only the subset of the operations that the destination systems support. See the origin and destination documentation for details about supported operations.

Earlier Implementations

Some origins were enabled for CDC using different record header attributes in earlier releases, but they all now include the sdc.operation.type record header attribute. All earlier CRUD header attributes are retained for backward compatibility.

Similarly, CRUD-enabled destinations that were enabled to look for the CRUD operation type in other header attributes can now look for the sdc.operation.type record header attribute first and check the alternate attribute afterwards. The alternate header attribute functionality is retained for backward compatibility.

CDC-Enabled Stages

CDC-enabled stages provide the CRUD operation type in the sdc.operation.type record header attribute. Some origins provide alternate and additional header attributes.

The following stages provide CRUD record header attributes:
CDC-Enabled Stage CRUD Record Header Attributes
Aurora PostgreSQL CDC Client Either includes the CRUD operation type in the record or includes the operation type in two record header attributes:
  • sdc.operation.type
  • postgres.cdc.operation

Includes additional CDC information in record header attributes with the postgres.cdc prefix, such as postgres.cdc.lsn.

For more information, see Record Contents and Generated Records.

MapR DB CDC Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes additional CDC information in record header attributes.

For more information, see CRUD Operation and CDC Header Attributes.

MongoDB Atlas CDC Includes the CRUD operation type in the sdc.operation.type record header attribute.

Can include additional CDC information in record header attributes, such as the op and ns attributes.

For more information, see Generated Records.

MongoDB Oplog Includes the CRUD operation type in the sdc.operation.type record header attribute.

Can include additional CDC information in record header attributes, such as the op and ns attributes.

For more information, see Generated Records.

MySQL Binary Log Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes additional CDC information in record fields.

For more information, see Generated Records.

Oracle CDC Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see CRUD Operation Header Attributes.

Includes additional CDC information in record header attributes with the oracle.cdc prefix, such as oracle.cdc.sequence.oracle.

Oracle CDC Client Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see CRUD Operation Header Attributes.

Includes additional CDC information in record header attributes with the oracle.cdc prefix, such as oracle.cdc.table.

Oracle Multitable Consumer Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see CRUD Operation Header Attributes.

PostgreSQL CDC Client Either includes the CRUD operation type in the record or includes the operation type in two record header attributes:
  • sdc.operation.type
  • postgres.cdc.operation

Includes additional CDC information in record header attributes with the postgres.cdc prefix, such as postgres.cdc.lsn.

For more information, see Record Contents and Generated Record

Salesforce Includes the CRUD operation type in the sdc.operation.type record header attribute.

For more information, see CRUD Operation Header Attribute.

SQL Parser Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

For more information, see Generated Records.

SQL Server CDC Client Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes CDC information in header attributes named jdbc.<CDC column name>. For more information, see Record Header Attributes.

SQL Server Change Tracking Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes additional information from change tables in the jdbc.SYS_CHANGE header attributes. For more information, see Record Header Attributes.

Data Changes in SQL Server or Azure SQL Database

SQL Server and Azure SQL Database offer multiple methods to track data changes. In Data Collector, the appropriate origin depends on the method that the database uses to track changes, as shown in the following table:
Method Used to Track Changes Data Collector Origin
CDC tables SQL Server CDC Client

For more information about CDC tables, see the Microsoft documentation.

Change tracking tables SQL Server Change Tracking

For more information about change tracking tables, see the Microsoft documentation.

Temporal tables JDBC Multitable Consumer or JDBC Query Consumer

For more information about temporal tables, see the Microsoft documentation.

CRUD-Enabled Stages

The following stages recognize CRUD operations stored in record header attributes and can perform writes based on those values. Some stages also provide CRUD-related properties.
CRUD-Enabled Stage Supported Operations Stage Processing
JDBC Tee processor
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

A Change Log property enables processing records based on the CDC-enabled origin in the pipeline.

For more information, see CRUD Operation Processing.

Azure Synapse SQL destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD Operation Processing.

Couchbase destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Write Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD Operation Processing.

Databricks Delta Lake destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD Operation Processing.

Elasticsearch destination
  • CREATE (INSERT)
  • UPDATE
  • INDEX (UPSERT)
  • DELETE
  • UPDATE with doc_as_upsert (MERGE)
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD Operation Processing.

Google BigQuery destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD Operation Processing.

GPSS Producer destination
  • INSERT
  • UPDATE
  • MERGE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD Operation Processing.

JDBC Producer destination
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

A Change Log Format property enables processing records based on the CDC-enabled origin in the pipeline.

For more information, see CRUD Operation Processing.

Kudu destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD Operation Processing.

MapR DB JSON destination
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Insert API and Set API properties in the stage

For more information, see Writing to MapR DB JSON.

MongoDB destination
  • INSERT
  • UPDATE
  • REPLACE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Upsert property in the stage

For more information, see Define the CRUD Operation.

MongoDB Atlas destination
  • INSERT
  • UPDATE
  • REPLACE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Upsert property in the stage

For more information, see Define the CRUD Operation.

Oracle destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD Operation Processing.

Redis destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see Define the CRUD Operation.

Salesforce destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
  • UNDELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD Operation Processing.

Salesforce Bulk API 2.0 destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see CRUD Operation Processing.

SingleStore destination
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

A Change Log Format property enables processing records based on the CDC-enabled origin in the pipeline.

For more information, see CRUD Operation Processing.

Snowflake destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD Operation Processing.

Teradata destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute

For more information, see CRUD Operation Processing.

Processing the Record

Change logs can provide record data in different formats. The JDBC Tee processor and JDBC Producer destination can decode most change log formats to generate record data based on the origin change log. When using other CRUD-enabled destinations, you might need to add additional processing to the pipeline to alter the format of the record.

For example, Microsoft SQL CDC records created by the JDBC Query Consumer origin contains CDC fields in the record, in addition to record data. You might use a Field Remover processor to drop any unnecessary fields from the record.

In contrast, the MySQL Server binary logs read by the My SQL Binary Log origin provides new or updated data in a New Data map field and changed or deleted data in a Changed Data map field. You might want to use the Field Flattener processor to flatten the map field with the data that you need, and a Field Remover processor to remove any unnecessary fields.

For details on the format of generated records, see the documentation for the CDC-enabled origin.

Use Cases

You can use CDC-enabled origins and CRUD-enabled destinations in pipelines together or individually. Here are some typical use cases:
CDC-enabled origin with CRUD-enabled destinations
You can use a CDC-enabled origin and a CRUD-enabled destination to easily process changed records and write them to a destination system.

For example, say you want to write CDC data from Microsoft SQL Server to Kudu. To do this, you use the CDC-enabled SQL Server CDC Client origin to read data from a Microsoft SQL Server change capture table. The origin places the CRUD operation type in the sdc.operation.type header attribute, in this case: 1 for INSERT, 2 for DELETE, 3 for UPDATE.

You configure the pipeline to write to the CRUD-enabled Kudu destination. In the Kudu destination, you can specify a default operation for any record with no value set in the sdc.operation.type attribute, and you can configure error handling for invalid values. You set the default to INSERT and you configure the destination to use this default for invalid values. In the sdc.operation.type attribute, the Kudu destination supports 1 for INSERT, 2 for DELETE, 3 for UPDATE, and 4 for UPSERT.

When you run the pipeline, the SQL Server CDC Client origin determines the CRUD operation type for each record and writes it to the sdc.operation.type record header attribute. And the Kudu destination uses the operation in the sdc.operation.type attribute to inform the Kudu destination system how to process each record. Any record with an undeclared value in the sdc.operation.type attribute, such as a record created by the pipeline, is treated like an INSERT record. And any record with an invalid value uses the same default behavior.

CDC-enabled origin to non-CRUD destinations

If you need to write changed data to a destination system without a CRUD-enabled destination, you can use an Expression Evaluator processor or any scripting processor to move the CRUD operation information from the sdc.operation.type header attribute to a field, so the information is retained in the record.

For example, say you want to read from Oracle LogMiner redo logs and write the records to Hive tables with all of the CDC information in record fields. To do this, you'd use the Oracle CDC Client origin to read the redo logs, then add an Expression Evaluator to pull the CRUD information from the sdc.operation.type header attribute into the record. Oracle CDC Client writes additional CDC information, such as the table name and SCN, into oracle.cdc header attributes, so you can use expressions to pull that information into the record as well. Then you can use the Hadoop FS destination to write the enhanced records to Hive.

Non-CDC origin to CRUD destinations
When reading data from a non-CDC origin, you can use an Expression Evaluator processor or any scripting processor to define the sdc.operation.type header attribute. e
For example, say you want to read from a transactional database table and keep a dimension table in sync with the changes. You'd use the JDBC Query Consumer to read the source table and a JDBC Lookup processor to check the dimension table for the primary key value of each record. Then, based on the output of the lookup processor, you know if there was a matching record in the table or not. Using an Expression Evaluator, you set the sdc.operation.type record header attribute - 3 to update the records that had a matching record, and 1 to insert new records.
When you pass the records to the JDBC Producer destination, the destination uses the operation in the sdc.operation.type header attribute to determine how to write the records to the dimension table.