MapR DB CDC

The MapR DB CDC origin reads changed data from MapR DB that has been written to MapR Streams. The origin can use multiple threads to enable parallel processing of data.

MapR is now HPE Ezmeral Data Fabric. At times, this documentation uses "MapR" to refer to both MapR and HPE Ezmeral Data Fabric. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.

You might use this origin to perform database replication. You can use a separate pipeline with the MapR DB JSON origin to read existing data. Then start a pipeline with the MapR DB CDC origin to process subsequent changes.

When you configure a MapR DB CDC origin, you configure the MapR Streams consumer group name and topics to process, and the number of threads to use. You can specify additional MapR Streams and supported Kafka configuration properties as needed.

The MapR DB CDC origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

Tip: Data Collector provides several MapR origins to address different needs. For a quick comparison chart to help you choose the right one, see Comparing MapR Origins.

Before you use any MapR stage in a pipeline, you must perform additional steps to enable Data Collector to process MapR data. For more information, see MapR Prerequisites in the Data Collector documentation.

Multithreaded Processing

The MapR DB CDC origin performs parallel processing and enables the creation of a multithreaded pipeline. The origin reads changed data from MapR DB that has been written to MapR Streams.

The MapR DB CDC origin uses multiple concurrent threads based on the Number of Threads property. MapR Streams distributes partitions equally among all the consumers in a group.

When performing multithreaded processing, the MapR DB CDC origin checks the list of topics to process and creates the specified number of threads. Each thread connects to MapR Streams and creates a batch of data from a partition assigned by MapR Streams. Then, it passes the batch to an available pipeline runner.

A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.

For example, say you set the Number of Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. The threads are assigned to different partitions as defined by MapR Streams. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview. For more information about the MapR Streams, see the MapR Streams documentation.

Handling the _id Field

All MapR DB changed data includes an _id field. The MapR DB CDC origin includes the _id field as a field in the generated record. If needed, you can use the Field Remover processor in the pipeline to remove the _id field.

The _id field can contain string or binary data. The MapR DB CDC origin can process data that includes string or binary data. The origin cannot read _id fields with a combination of string and binary data.

When incoming data includes a string _id field, the origin creates the _id field in the record as a String. When incoming data includes a binary _id field, the origin converts the data to String and then includes the field in the record.

Note: Binary _id fields must contain numeric data to be processed correctly.

CRUD Operation and CDC Header Attributes

The MapR DB CDC origin includes the CRUD operation type in the sdc.operation.type record header attribute.

If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Elasticsearch, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator processor or any scripting processor to manipulate the value in the header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

The MapR DB CDC origin includes additional CDC information in other record header attributes. The following table describes the record header attributes that the MapR DB CDC origin generates:
Record Header Attribute Description
mapr.op.timestamp The timestamp associated with the change.
mapr.server.timestamp The server timestamp associated with the change.
partition Partition where the data originated.
offset Offset for the data.
sdc.operation.type The CRUD operation type associated with the record. The sdc.operation.type record header attribute can include the following values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
topic Topic where the data originated.

Additional Properties

You can add custom configuration properties to the MapR DB CDC origin. You can use any MapR or Kafka property supported by MapR Streams. For more information, see the MapR Streams documentation.

When you add a configuration property, enter the exact property name and the value. The MapR DB CDC origin does not validate the property names or values.

Note: The MapR DB CDC origin uses the following MapR Streams configuration properties. The origin ignores user-defined values for these properties:
  • auto.commit.interval.ms
  • enable.auto.commit
  • group.id
  • max.poll.record

Configuring a MapR DB CDC Origin

Configure a MapR DB CDC origin to process MapR DB changed data that has been written to MapR Streams.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the MapR DB CDC tab, configure the following properties:
    MapR DB CDC Property Description
    Consumer Group Consumer group that the Data Collector belongs to.
    Topic List The topics to read.

    On the left side, enter the stream name and topic, as follows:

    /<stream_name>:<topic>

    For example, /data/sales:changelog.

    On the right side, enter the name of the table as follows:
    /<table>

    For example: /west.

    Click Add to add additional topics.

    Number of Threads Number of threads the origin generates and uses for multithreaded processing.
    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Max Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch.
    MapR Streams Configuration Additional configuration properties to use. Using simple or bulk edit mode, click the Add icon to add properties.

    Use the property names and values as expected by MapR Streams.

    You can use MapR Streams properties and the set of Kafka properties supported by MapR Streams.