Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

POC of the proposed changes:
POC for split-based alignment:


When using event time and watermark and if there is a slight imbalance or data skew on the source level, Flink can enter into a degenerate state, where different source operator instances are way ahead of others in terms of event time when ingesting records. For example due to backpressure combined with data skew, one source instance can be many hours behind another instance. This on its own is not a problem per se. However for downstream operators that are using watermarks to emit some data it can actually become a problem. Because of that such downstream operator (like windowed joins on aggregations) might need to buffer excessive amount of data, as the minimal watermark from all of its inputs is held back by the lagging source instance. All records emitted by the non-backpressured sources will hence have to be buffered on the said downstream operator state, which can lead into uncontrollable growth of the operator's state.

FLIP-182 proposes to solve this problem by introducing event time alignment on top of the existing FLIP-27 source interface. This can work within a single source, but also across multiple source instances, for example if single job is reading data from two independent sources like: Kafka and a file source.

Part I

Public Interfaces

The only proposed change in the first part is to add a way to configure watermarkGroup and the maxAllowedWatermarkDrift

public interface WatermarkStrategy<T> {
    WatermarkStrategy<T> withWatermarkAlignment(String watermarkGroup, Duration maxAllowedWatermarkDrift);

Proposed Changes

The first step will focus on blocking individual `SourceOperator` instances ignoring existence of the splits. General idea is that we want to cap/limit emitting watermarks. If one `SourceOperator` instance is too far ahead compared to others, we want to block it from emitting new records until other `SourceOperator`s manage to catch up. In order to achieve that there are a couple of required changes:

  1. User would first define which `Source` instances belong to the same watermarkGroup
  2. every `SourceOperator` needs to periodically report (using `OperatorEvent`) last emitted watermark to its `SourceCoordinator`
  3. `SourceCoordinator` needs to combine those reported watermarks
    1. First it needs calculate what's the minimal value among `SourceOperator` that belongs to that `SourceCoordinator` instance
    2. Next it can synchronize with other `SourceCoordinator` instances that belong to the same watermarkGroup, to calculate global minimal watermark from all different sources.
    3. `SourceCoordinator` can announce (again using `OperatorEvent`) the calculated minimal watermark to all of its `SourceOperator`s
  4. `SourceOperator` can now decide based on the minimal watermark, by adding a pre configured `maxAllowedWatermarkDrift`, whether it's exceeding the minimal watermark by too much or not. If the answer is yes, it can make itself unavailable, effectively blocking consumption of its records.

This would allow to fully solve the watermark alignment problem for case when number of splits is equal to the source parallelism and somehow limit the impact of this problem in other cases.

Part II

The purpose of the second part would be to add support for watermark alignment per each split. The difference here is that we need to implement the blocking logic on the split level, and `SourceOperator` doesn't have such knowledge. This part would have to move the blocking logic into the individual `SourceReader` classes. This is proposed as a follow up step that would most likely require another FLIP document.

Compatibility, Deprecation, and Migration Plan

There should be no need for any migration plan and this change shouldn't brake any compatibility.

Test Plan

The following job:

        DataStream<Long> eventStream =
                                new NumberSequenceSource(0, Long.MAX_VALUE),
                                        .withTimestampAssigner(new LongTimestampAssigner()),
                                        .withWatermarkAlignment("group-1", Duration.ofSeconds(30))),
                                new RichMapFunction<Long, Long>() {
                                    public Long map(Long value) throws Exception {
                                        if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                                        return 1L;

Should result in all but first source subtask to be blocked by the watermark alignment.

Rejected Alternatives