Discussion thread
Vote thread
JIRA

FLINK-15331 - Getting issue details... STATUS

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Being able to directly ingest and interpret changelog into Flink Table API & SQL is a highly demanded feature in the community, see [1] [2] [3] [4] [5]. In production, CDC (Change Data Capture) is a popular pattern which is used for replicating data, feeding search indexes, updating caches, synchronizing data between microservices, auditing logs and so on. There are many CDC tools (MySQL CDC projects) in the open source community, it indicates that CDC is widely used in companies. Support to interpret changelog will allow users to use Flink SQL to manipulate these streams in real time, and broaden use cases of Flink.

On the other hand, the “Dynamic Table” concept we proposed years ago defines there are two modes to define a dynamic table on a stream: Append Mode and Update Mode. Converting a stream to a dynamic table in append mode is already supported in Flink, but update mode is not supported yet. Supporting to interpret changelog is for filling the missing piece of the puzzle to have a complete Dynamic Table concept and a solid foundation for Flink. 

Flink User stories in vision based on changelog:

  • Data synchronization using Flink SQL: synchronize data from one database to different databases, e.g. from MySQL to PG and ElasticSearch. 
  • Real-time materializing aggregate views on a source database: example
  • Temporal join a changelog in a low latency even if the database is huge. 
  • Event time temporal join to give an accurate result: auditing in banks.

The goal of this FLIP is:

  • A user can read and interpret external system’s CDC (change data capture) into Flink, e.g. Debezium CDC, MySQL binlog, Kafka compacted topic, Hudi incremental outputs
    • Connecting Debezium changelog into Flink is the most important, because Debezium supports to capture changes from MySQL, PostgreSQL, SQL Server, Oracle, Cassandra and MongoDB. If Flink supports Debezium, that means Flink can connect changelogs of all the databases above which is really a big ecosystem.

Public Interfaces

We propose to introduce two formats (FormatFactory) to support interpreting external changelog data. 


CREATE TABLE my_table (
  ...
) WITH (
'connector'='...', -- e.g. 'kafka'
'format'='debezium-json',
'debezium-json.schema-include'='true' -- default false, Debeizum can be configured to include or exclude the message schema
'debezium-json.ignore-parse-errors'='true' -- default false
);

CREATE TABLE my_table (
  ...
) WITH (
'connector'='...', -- e.g. 'kafka'
'format'='canal-json',
'canal-json.ignore-parse-errors'='true' -- default false
);


DebeziumFormatFactory and CanalFormatFactory will produce ChangelogMode with all 4 kinds (INSERT+DELET+UPDATE_BEFORE+UPDATE_AFTER). See FormatFactory#inferChangelogMode

Both of them will locate in flink-json  module under {{org.apache.flink.formats.json.debezium}} and  {{org.apache.flink.formats.json.canal}} package.

Note: 'debezium-json' and 'canal-json' only support to deserialize Debezium and Canal json data. Serialization is not included in this FLIP and should be discussed in the future. The main reason is that currently Flink doesn't support to emit a combined update message to sink.

Proposed Changes

In order to interpret changelog and emit changelog, the core idea is how to decode & encode the change operation from external system to Flink system. We propose to introduce a new TableSource and TableSink interfaces to support this, because source and sink are the components to bridge external systems. The community has been working on this for the past months, and that is the FLIP-95 which has been accepted. In the proposed FLIP-95, the RowData#getRowKind represents the change operation kind of this row. The ScanTableSource#getChangelogMode represents what kind of changes are produced by this source.

In FLIP-105, we will discuss the remaining things: introducing new CDC formats.

In the public CDC survey report, the Debezium and Canal are the most popular CDC tools in English and Chinese users. So we would like to support them together as the MVP features. Debezium and Canal are change data captures tools which synchornize changelogs to other systems, e.g. message queues. They encodes changelogs in theire own formats. So the main work is interpreting the format. 

Debezium Format

Debezium is a CDC tool built on top of Kafka Connect that can stream changes in real-time from MySQL, PostgreSQL, MongoDB, Oracle, and Microsoft SQL Server into Kafka. Debezium produces a unified format for changelogs to Kafka, here is a simple example for an update operation (schema excluded):

{
  "before": {
    "id": 1004,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
  },
  "after": {
    "id": 1004,
    "first_name": "Anne Marie",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
  },
  "source": { ... },
  "op": "u",
  "ts_ms": 1465581029523
}

The meaning of each field:

  • The ‘op’ field is a mandatory field which describes the type of operation, ‘c’ for create (or insert), ‘u’ for update, ‘d’ for delete.
  • ‘before’ is an optional field that if present contains the state of the row before the event occurred. ‘null’ if it is a create operation.
  • ‘after’ is an optional field that if present contains the state of the row after the event occurred. ‘null’ if it is a delete operation.
  • The ‘source’ is a mandatory field that contains metadata for the event, e.g. offset, binlog file, database, table...
  • The ‘ts_ms’ shows the timestamp that Debezium processed this event.

Debezium is not a storage system, but a storage format. We can introduce a format “format=debezium-json”. This format is based on JSON format, and will be converted into a row using the underlying JsonDeserializetionSchema. DebeziumDeserializationSchema will decompose the deserialized row into one or two rows. The decomposed rows only contain the physical data ("before" or "after") with the interpreted change operation kind. All the other meta information will be dropped. The returned row matches the schema in the original database and the Flink DDL.

For example, in order to consume the above debezium json data in Kafka, user should define DDL like this:

CREATE TABLE my_table (
  id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);


Any source (like Kafka) which integrates format interface can have the ability to parse Debezium format and produce change events. 

We may introduce “format=debezium-avro” in the future, because Avro a more efficient format and Debezium supports that

Canal Format

Canal is a popular CDC tool in China which is used to capture changes from MySQL to other systems. It supports stream changes to Kafka and RocketMQ in JSON format and protobuf format. Here is a simple example for an update operation:


{
  "data": [
    {
      "id": "13",
      "username": "13",
      "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
      "name": "Canal Manager V2"
    }
  ],
  "old": [
    {
      "id": "13",
      "username": "13",
      "password": "6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9",
      "name": "Canal Manager"
    }
  ],
  "database": "canal_manager",
  "es": 1568972368000,
  "id": 11,
  "isDdl": false,
  "mysqlType": {...},
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {...},
  "table": "canal_user",
  "ts": 1568972369005,
  "type": "UPDATE"
}

The meaning of some important fields:

  • The ‘type’ field describes the type of operation, including ‘UPDATE’, 'INSERT', 'DELETE'.
  • ‘data’ represents the row data. It represents the content of the row if it is 'INSERT', the after state of the row if it is 'UPDATE', the before state of the row if it is 'DELETE'.
  • ‘old’ is an optional field that if present contains the state of the row before update event occurred. ‘null’ if it is not an update operation.

We will introduce a format “format=canal-json”. This format is based on JSON format, the deserialization logic is similar to Debezium format.

Any source (like Kafka) which integrates format interface can have the ability to parse Canal format and produce change events. 

We may introduce “format=canal-protobuf” in the future, because Protobuf a more efficient format and Canal supports to encode in protobuf. 

Future Work

Some future work is dicsussed in the original design documentation but not included in the final FLIP, including EMIT CHANGELOG syntax, support "mysql-binlog" source connector, connect to Kafka compacted topic, support changelog in batch execution, and so on. We will discuss them in the future if needed. Original documentation: https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#