Discussion thread | -https://lists.apache.org/thread/6m47ggxn9sl0qgfgyq20ytyzlw42crxj |
---|---|
Vote thread | - |
JIRA | - |
Release | - |
...
[This FLIP proposal is a joint work between Xuannan Su and Dong Lin ]
Table of Contents
Motivation
Assuming the user needs to perform a processing-time temporal join where the Probe Side records are obtained from a Kafka Source and the Build Side records are obtained from a MySQL CDC Source, which consists of a snapshot reading phase followed by a binlog reading phase. Notably, all input records lack event-time information. The user's requirement is that each record on the Probe Side must be joined with at least the records from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the completion of the Build Side's snapshot phase before processing the Probe Side's data.
...
This document proposes the introduction of APIs that allow source operators (e.g., HybridSource, MySQL CDC Source) to send watermarks to downstream operators, indicating that the watermark should start increasing according to the system time. In addition to supporting processing-time temporal joins, this FLIP provides the fundation to simplifies DataStream APIs such as KeyedStream#window(...), such that users would no longer need to explicitly differentiate between TumblingEventTimeWindows and TumblingProcessingTimeWindows, leading to a more intuitive experience.
Terminology and Background
The FLIP proposes changes to Flink's watermark and timestamp concepts. To better understand the underlying design, let's recap the relevant concepts in this section.
...
- When creating a source, the user provides a
WatermarkStrategy
toStreamExecutionEnvironment#fromSource
. - If the source natively supports event time (e.g., KafkaSource) or the user provides a custom
TimestampAssigner
in theWatermarkStrategy
to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be set toLong.MIN_VALUE
. - If the user employs
NoWatermarkGenerator
in theWatermarkStrategy
, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The frequency of watermark emission is determined bypipeline.auto-watermark-interval
, with a default value of 200ms.
Public Interfaces
1) Add an useProcessingTime field to org.apache.flink.api.common.eventtime.Watermark and org.apache.flink.streaming.api.watermark.Watermark.
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Public public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener { ... /** * Provide SourceReader with a runnable that can be used to emit watermark. * * If SourceReader wants to own the responsibility of invoking WatermarkGenerator#onPeriodicEmit, * it should override this method and return true. if it wants to own the responsibility * of invoking WatermarkGenerator#onPeriodicEmitAnd SourceReader should respect * pipeline.auto-watermark-interval if it decides to emit watermark periodically. * * @return true iff the caller should avoid triggering WatermarkGenerator#onPeriodicEmit. */ default boolean delegateWatermarkPeriodicEmit(Runnable onWatermarkEmit) { return false; } } |
Proposed Changes
1) Update SourceOperator behavior
...
By incorporating these updates, we ensure that the IndexedCombinedWatermarkStatus maintains the desired behavior of having the minimum watermark timestamp among all inputs while preventing any decrease in the watermark.
Example Usage
Here is the Flink SQL example that demonstrates how to perform processing time temporal join after the FLIP.
...
Code Block | ||
---|---|---|
| ||
-- Create mysql cdc source table (dimension table) CREATE TEMPORARY TABLE user_info ( user_id INTEGER PRIMARY KEY NOT ENFORCED, gender STRING ) WITH ( 'connector' = 'mysql-cdc', 'database-name' = 'example_database', 'hostname' = 'localhost', 'username' = 'root', 'password' = 'root', 'table-name' = 'user_info' ); -- Create datagen source table (fact table) CREATE TEMPORARY TABLE click_event ( user_id INTEGER, item_id INTEGER, proctime AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.min' = '0', 'fields.user_id.max' = '9' ); -- Create a print sink table CREATE TEMPORARY TABLE print_sink ( user_id INTEGER, item_id INTEGER, gender STRING ) WITH ( 'connector' = 'print' ); -- Processing time temporal join INSERT INTO print_sink SELECT click_event.user_id AS user_id, item_id, gender FROM click_event LEFT JOIN user_info FOR SYSTEM_TIME AS OF click_event.proctime ON click_event.user_id = user_info.user_id; |
Compatibility, Deprecation, and Migration Plan
The proposed change might be negative impact user experience in the following scenario:
...
- We only introduce new APIs, which do not cause existing code handling Watermarks to fail. Moreover, the default setting for the
useProcessingTime
parameter in Watermark instances is false, preserving the existing semantics. With the updates to
AbstractStreamOperator/AbstractStreamOperatorV2
based on this FLIP, all operators can now support Watermarks with theuseProcessingTime
field and correctly trigger the operator's timer based on event time or system time. For sources that have not been updated, the Watermarks they send always haveuseProcessingTime
set to false. In this case, the behavior of the operators remains unchanged, ensuring compatibility with existing jobs.
Test Plan
The change will be covered with unit and integration tests.
Future Work
After this FLIP, we can unify the API for processing time and event time. The following are some examples of the APIs pairs that distinguish between event time and processing time. Currently, in the DataStream API, users need to explicitly differentiate between Processing Time and Event Time in several places when expressing job logic.
...