Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Through mailing lists and community issues, many users have already expressed their needs for the upsert Kafka. After looking through the mailing lists, we think there are 3 reasons behind:

  • Interpret the compacted kafka topic as a changelog stream that interpret records with keys as upsert events [1-3];  
  • As a part of the real time pipeline, join multiple streams for enrichment and store results into a Kafka topic for further calculation later. However, the result may contain update events.
  • As a part of the real time pipeline, aggregate on data streams and store results into a Kafka topic for further calculation later. However, the result may contain update events.

Public Interface

Upsert-kafka Connector Definition

Upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a “DELETE”.

Primary-Key Constraints on the upsert-kafka

When the upsert-kafka connector is used as a sink, it works similar to the existing HBase sink. Upsert-kafka sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key).  Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.

Upsert-kafka source is a kind of changelog source. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key.

Creating an upsert-kafka table in Flink requires declaring the primary key on the table. The primary key definition also controls which fields should end up in Kafka’s key. Therefore, we don’t need the ‘key.fields’ option in upsert-kafka connector. By default, primary key fields will also be stored in Kafka’s value as well. But users can still use option ‘value.fields-include’ to control this behavior.

Upsert-kafka Source

Generally speaking, the underlying topic of the upsert-kafka source must be compacted. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.

Currently, the upsert-kafka connector doesn’t provide start reading position options. The upsert-kafka connector must be read from the earliest offset. This is a protection for data integrity, otherwise it’s hard to explain what’s the behavior when users specify the start offset from a middle position and how to process delete events whose keys have never been seen. Therefore, the upsert-kafka connector doesn’t provide options like 'scan.startup.mode', 'scan.startup.specific-offsets', 'scan.startup.timestamp-millis' and 'properties.group.id' (only used for 'group-offsets' startup mode).

Upsert-kafka Sink

In order to guarantee the message ordering, the upsert-kafka sink will always work in HASH partitioner mode on the primary key fields. Therefore, we don’t need the ‘sink.partitioner’ option in the upsert-kafka connector. 

Upsert-kafka Connector Options

The options in upsert-kafka Connector are much like Kafka Connector.

Option

Required

Default

Type

Description

connector

required

(none)

String

Specify what connector to use, here should be 'upsert-kafka'.

properties.bootstrap.servers

required

(none)

String

Comma separated list of Kafka brokers.

key.format

required

(none)

String

The format used to deserialize and serialize the key of Kafka record. Only insert-only format is supported.

value.format

required

(none)

String

The format used to deserialize and serialize the value of Kafka records.

topic

optional

(none)

String

Topic name(s) to read data from when the table is used as source or to write data to when the table is used as sink. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.

topic-patternoptional(none)StringThe regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.

value.fields-include

optional

'ALL'

String

Controls which fields should end up in the value as well, possible values 

-   ALL (all fields of the schema, even if they are part of e.g. the key)

-   EXCEPT_KEY (all fields of the schema - fields of the key)

 

Note: only insert-only format is supported to be used as 'key.format' and 'value.format'. We will use the ChangelogMode of the format to distinguish whether the format is insert-only.

Examples

Convert a Kafka topic with debezium format into the upsert-kafka Table

-- register a kafka source which interpret debezium topic as a changelog stream
CREATE TABLE dbz_users (
  user_id BIGINT,
  user_name STRING,
  region STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbz_users',
  'properties.bootstrap.servers' = '...',
  'format' = 'debezium-json'
);
-- changelog mode will not materialize results and display records with op flag.
> SET execution.result-mode=changelog;
> SELECT * FROM dbz_users;
+----+---------+-----------+----------+
| op | user_id | user_name |   region |
+----+---------+-----------+----------+
| +I | 100     |  Bob      | Beijing  |
| +I | 101     |  Alice    | Shanghai |
| +I | 102     |  Greg     | Berlin   |
| +I | 103     |  Richard  | Berlin   |
| -U | 101     |  Alice    | Shanghai |
| +U | 101     |  Alice    | Hangzhou |
| -D | 103     |  Richard  | Berlin   |
+----+---------+-----------+----------+

-- register an upsert-kafka sink which will be used for storing latest users information
CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- convert the debezium topic into kafka compacted topic
INSERT INTO users SELECT * FROM dbz_users;

-- table mode will materialize changelog and refresh the final results
> SET execution.result-mode=table;
> SELECT * FROM users;

+---------+-----------+----------+
| user_id | user_name |   region |
+---------+-----------+----------+
| 100     |  Bob      | Beijing  |
| 101     |  Alice    | Hangzhou |
| 102     |  Greg     | Berlin   |
+---------+-----------+----------+

Use the upsert-kafka as a reference/dimension table

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

> SET execution.result-mode=changelog;
> SELECT * FROM pageviews;

+----+---------+-----------+----------------------+----------+
| op | user_id |   page_id |             viewtime | proctime |
+----+---------+-----------+----------------------+----------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  | ........ |
| +I | 102     |  10002    | 2020-10-01 08:02:00  | ........ |
| +I | 101     |  10002    | 2020-10-01 08:04:00  | ........ |
| +I | 102     |  10004    | 2020-10-01 08:06:00  | ........ |
| +I | 102     |  10003    | 2020-10-01 08:07:00  | ........ |
+----+---------+-----------+----------------------+----------+


CREATE TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews_enriched',
  ...
);

-- insert-only stream temporal join a changelog stream which will be supported by FLIP-132
INSERT INTO pageviews_enriched
SELECT * 
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;

> SET execution.result-mode=changelog;
> SELECT * pageviews_enriched;

+----+---------+-----------+----------------------+-------------+
| op | user_id |   page_id |             viewtime | user_region |
+----+---------+-----------+----------------------+-------------+
| +I | 100     |  10001    | 2020-10-01 08:01:00  |    Beijing  |     
| +I | 102     |  10002    | 2020-10-01 08:02:00  |    Berlin   |    
| +I | 101     |  10002    | 2020-10-01 08:04:00  |    Hangzhou |      
| +I | 102     |  10004    | 2020-10-01 08:06:00  |    Berlin   |    
| +I | 102     |  10003    | 2020-10-01 08:07:00  |    Berlin   |  
+----+---------+-----------+----------------------+-------------+

Write aggregate results into the upsert-kafka

CREATE TABLE pageviews_per_region_5min (
  window_start STRING,
  region STRING,
  view_count BIGINT,
  PRIMARY KEY (window_start, region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region_5min',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_per_region_5min
SELECT 
  TUMBLE_START(viewtime, INTERVAL '5' MINUTE), 
  region,
  COUNT(*)
FROM pageviews_enriched
GROUP BY region, TUMBLE(viewtime, INTERVAL '5' MINUTE);

> SET execution.result-mode=table;
> SELECT * pageviews_per_region_5min;

+----------------------+-----------+-------------+
|         window_start |    region |  view_count |
+----------------------+-----------+-------------+
| 2020-10-01 08:00:00  |  Beijing  |          1  |
| 2020-10-01 08:00:00  |  Berlin   |          1  |
| 2020-10-01 08:00:00  |  Hangzhou |          1  |
| 2020-10-01 08:05:00  |  Berlin   |          2  |
+----------------------+-----------+-------------+

Aggregate on the upsert-kafka connector

CREATE TABLE pageviews_per_region (
  region STRING,
  view_count BIGINT,
  PRIMARY KEY (region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

INSERT INTO pageviews_per_region
SELECT 
  region,
  SUM(view_count)
FROM pageviews_per_region_5min
GROUP BY region;
> SET execution.result-mode=table;
> SELECT * pageviews_per_region;

+-----------+-------------+
|    region |  view_count |
+-----------+-------------+
|  Beijing  |          1  |
|  Berlin   |          3  |
|  Hangzhou |          1  |
+-----------+-------------+

Implementation Details

Due to the upsert-kafka connector only produces upsert stream which doesn’t contain UPDATE_BEFORE messages. However, several operations require the UPDATE_BEFORE messages for correctly processing, e.g. aggregations. Therefore, we need to have a physical node to materialize the upsert stream and generate changelog stream with full change messages. In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values.

How can the planner know to add such a materialization operator? This depends on the ChangeMode of the source which is introduced in FLIP-95. Currently, we only support insert-only or all-kinds (e.g. CDC format) ChangelogMode. In this FLIP, we will support [UPDATE_AFTER, DELETE] ChangelogMode which indicates the source will emit only UPDATE_AFTER and DELETE messages during runtime. The planner will add the above materialization operator when the ChangelogMode of the source is [UPDATE_AFTER, DELETE]. 

Considering the similarity between Kafka connector and the upsert-kafka connector, we should reuse most of the code under the hood and just introduce a different connector factory.

Compatibility, Deprecation, and Migration Plan

This change introduces a new feature that does not imply any compatibility concerns.

Rejected Alternatives

Introduce a new property in Kafka connector vs Introduce an upsert-kafka connector

  1. It's hard to explain what's the behavior when users specify the start offset from a middle position (e.g. how to process non exist delete events). It's dangerous if users do that. So we don't provide the offset option in the new connector at the moment.
  2. It's a different perspective/abstraction on the same kafka topic (append vs. upsert). It would be easier to understand if we can separate them instead of mixing them in one connector. The new connector requires hash sink partitioner, primary key declared, regular format. If we mix them in one connector, it might be confusing how to use the options correctly.
  3. The semantic of the upsert-kafka connector is just the same as KTable in Kafka Stream. So it's very handy for Kafka Stream and KSQL users. We have seen several questions [1][2] in the mailing list asking how to model a KTable and how to join a KTable in Flink SQL.

Use upsert-kafka as the new connector name vs Use kafka-compacted as the name vs Use ktable as the name

Considering the KTable has more implicit meaning than expected and the compacted meaning in the kafka-compacted is more related to topic rather than table itself, it's more suitable to  use the upsert-kafka as the name of the new connector that is much straightforward.

Future Work

Support bounded upsert-kafka source

It’s also possible that users want to use upsert-kafka connector in batch mode. But we need more discuission about this feature.

References

[1]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503

[2]http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/From-Kafka-Stream-to-Flink-td28879.html

[3]https://issues.apache.org/jira/browse/FLINK-19149