Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Proposers

- @rmahindra
- @vbalaji

Approvers

- @vinothchandar

Status

JIRA: https://issues.apache.org/jira/browse/HUDI-1290

Abstract

We intend to implement a source for ingesting Debezium Change Data Capture (CDC) logs into Deltastreamer/ Hudi. With this capability, we can continuously capture row-level changes that insert, update and delete records that were committed to a database. While debezium support multiple databases, we will focus on postgres for the RFC. At the end, we will explain how it can be extended to support Mysql.

Background

The architecture of Debezium is shown in figure below. Debezium is implemented as a Kafka connect source, that reads change logs from databases (logical decoding in PostgreSQL and binlog in MySQL) and ingests them into a kafka topic. Debezium uses a single kafka topic per table in the source database.

...

There are other ways to deploy Debezium, such as Debezium Server, that can write events to other stream systems, such as pulsar, kenisis, google pub/sub etc. However, this RFC focuses on the debezium source in deltastreamer that will assume Kafka as the source for the change log events.

Image Modified

Implementation

As shown in the figure above, in order to ingest the rows from the database into hudi and maintain the change operations done on database in hudi, we need to perform 2 steps: (I) We have to bootstrap the initial data from the database and (ii) incrementally consume the change logs to insert or update the records into hudi

...

To ensure proper de-dup, merging, and hard deletes of the records, we implement a custom AvroPayload class for debeizum: DebeziumAvroPayload.java. During writes, we check if the op field of the record is d , we return an empty payload to ensure the record is deleted in storage. In the case of preCombine or combineAndGetUpdateValue (merge handling of records), we return the existing stored record if the LSN (in case of PostgresSQL) of the existing record is higher than the newly inserted record. Else, the new record is written. In the case of MySQL DB, we will either use the GTID field or a combination of bin file id and bin pos to identify the relative ordering of the events when they actually happened in the database.

Handling merges with Postgres Toast Columns

TOAST (The Oversized-Attribute Storage Technique) is a mechanism in Postgres which stores large column values in multiple physical rows, circumventing the page size limit of 8 KB.

...

During merging, we check for toast columns in the insert records, and if present, we update their value using the values from the current record on disk.

Deltastreamer configuration

To run the deltastreamer, we need to configure the following:

  1. The source ordering field should be set to _source_lsn.
  2. Configure the schema registry server that is used by the Debezium connector.
  3. Record Key(s) should be the primary key(s) of the database and can be obtained from the schema registry, since debezium uses the primary key(s) as the key for the kafka topic.
  4. Configure the deltastreamer to use the DebeziumSource and DebeziumAvroPayload classes for the source and payload classes respectively.

Limitations

With the current constraints within Hudi, we discuss a few limitations of the current implementation for CDC. Consider a case where we have the following change log events for a single row/ record in the following order from kafka: Insert (LSN=1), Delete (LSN=3), Updated (LSN=2). If all these events are ingested in the same batch, then dedup will only pick the second event, since it has the highest LSN. However, if the second and third event are ingested in different batches, then the second event would have deleted the hudi record from the disk. When we receive the third event, we would insert it since the record has been previously deleted. This limitation holds for both CoW and MoR tables. To resolve this limitation, we have to keep the ordering value of each record even after deletion, to ensure we can apply an insert only if the insert event has a ordering value higher than the delete event. This limitation only happens in the case of an out of ordered delete event. Out-of-ordered insert and update events should be applied correctly.

Rollout/Adoption Plan

This is a new feature specific to Debezium CDC use case, and should not impact existing jobs or tables.
Our plan is to first implement a source for Debeizum for PostgresSQL DB, and subsequently a source for Debezium for MySQL DB.

Test Plan

We plan to test the Debezium source by setting up a AWS RDS instance of PostgresSQL, debezium connector using strimzi operator on k8s and a AWS MSK kafka cluster. We will test for correctness by performing SQL based DDL operations, such as insert, update and deletions on multiple records/ rows in the Postgres DB, and query the hudi table to validate that the operations took effect on the records in the hudi table.