Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In FLIP-182, watermark alignment has been proposed to solve the problem of data skew in applications with event time and watermarks caused by imbalanced sources which means sources that emit data and watermarks at different pace. The problem is that the imbalance of the sources may force a downstream operator that emits data based on the watermarks to buffer excessive amounts of data from a faster source while waiting for data from a slower source.
The approach of watermark alignment is described in FLIP-182 and introduces the notion of alignment groups where watermark emission of source operators that belong to the same user-defined alignment group coordinate and adapt watermark emission to align with the pace of the slowest source operator. However, a single source operator may read data from multiple splits/partitions, e.g., multiple Kafka partitions, such that even with watermark alignment the source operator may need to buffer excessive amount of data if one split emits data faster than another.
The current solution to handle imbalanced splits is simply to avoid multiple splits and configure a single split per source operator. This, however, is a limitation because the possibility of having multiple splits per source operator is a feature that gives flexibility to balance the number of allocated source tasks and the number of data sources and, therefore, to a more efficient utilization of resources. Moreover, FLIP-182 describes the extension of watermark alignment towards split alignment as a follow-up step.
This FLIP proposes an extension of watermark alignment as of FLIP-182 to align data emission across splits within a source operator if watermark alignment is configured while taking watermark boundaries into account.
Public Interfaces
The public interface `SourceReader` is extended with an additional method to supporting split alignment.
@Public public interface SourceReader<T, SplitT extends SourceSplit> { ... /** * Pauses or resumes reading of individual source splits. * * <p>Note that no other methods can be called in parallel, so it's fine to non-atomically * update subscriptions. This method is simply providing connectors with more expressive APIs * the opportunity to update all subscriptions at once. * * <p>This is currently used to align the watermarks of splits, if watermark alignment is * used and the source reads from more than one split. * * <p>The default implementation throws an {@link UnsupportedOperationException} where the * default implementation will be removed in future releases. To be compatible with future * releases, it is recommended to implement this method and override the default implementation. * * @param splitsToPause the splits to pause * @param splitsToResume the splits to resume */ @PublicEvolving default void pauseOrResumeSplits( Collection<String> splitsToPause, Collection<String> splitsToResume) { throw new UnsupportedOperationException( "This source reader does not support pause or resume splits."); } }
The public evolving interface `SplitReader` is extended with an additional method to supporting split alignment.
@PublicEvolving public interface SplitReader<E, SplitT extends SourceSplit> { ... /** * Pauses or resumes reading of individual splits readers. * * <p>Note that no other methods can be called in parallel, so it's fine to non-atomically * update subscriptions. This method is simply providing connectors with more expressive APIs * the opportunity to update all subscriptions at once. * * <p>This is currently used to align the watermarks of splits, if watermark alignment is used * and the source reads from more than one split. * * <p>The default implementation throws an {@link UnsupportedOperationException} where the * default implementation will be removed in future releases. To be compatible with future * releases, it is recommended to implement this method and override the default implementation. * * @param splitsToPause the splits to pause * @param splitsToResume the splits to resume */ default void pauseOrResumeSplits( Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume) { throw new UnsupportedOperationException( "This split reader does not support pause or resume."); } }
Proposed Changes
In the current implementation of watermark alignment, according to FLIP-182, the `SourceOperator` takes the central role and performs, in summary, the following tasks:
- observes watermarks emitted from its `SourceReader` to the `ReaderOutput`
- reports current watermarks to the `SourceCoordinator`
- receives, in return, `WatermarkAlignmentEvent`s which include maximum allowed watermarks from `SourceCoordinator`
- and finally controls the operating mode to pause or resume emission of data from `SourceReader` to `ReaderOutput`
The alignment of splits requires the `SourceOperator` to take a similar central role. However, its concrete implementation differs significantly because `SourceOperator`s must perform the following tasks:
- observe watermarks from individual `SplitReader`s
- coordinates if and which `SplitReader`s should be paused or resumed
- (initiate) pause and resume of individual `SplitReader` data emission.
Note: The `SourceOperator` pauses or resumes splits only if the source has more than one split assigned and only if watermark alignment is used.
To implement the mentioned tasks, we propose the following changes:
- To obtain watermarks from individual splits, the `SourceOperator` implements the `WatermarkUpdateListener` interface and propagates an instance to the `SourceOutput` of each `SplitReader`. (This requires no public interface change as this is done internally: Each `PartialWatermark` of the `WatermarkOutputMulitplexer` that is attached to each `SplitReader`'s individual `SourceOutput` reports individual watermark updates through the `WatermarkUpdateListener` to the `SourceOperator`.)
On a `WatermarkAlignmentEvent` which comes with a `maxDesiredWatermark`, the `SourceOperator` determines which splits should be paused or resumed given the current watermarks of the individual splits and the following policy:
if numSplits <= 1: return // If there is only a single split, we do not pause the split but the source. for each SplitReader x: if x's watermark > maxDesiredWatermark: pause x else if x is paused: resume x
To initiate pause and resume of individual `SplitReader`s, the `SourceOperator` forwards identifiers of the splits to be resumed and to paused to the `SourceReader` but only if it has more than one split assigned. For that purpose, `SourceReader` must implement the `pauseOrResumeSplits` method and `SplitReader` likewise the `pauseOrResumeSplits` method as described above. Note that both, `SourceReader` and `SplitReader`, have default implementations of the respective methods that throw `UnsupportedOperationException`. The default implementations may be removed in future releases according to a migration/compatibility strategy described below.
Compatibility, Deprecation, and Migration Plan
A `SourceReader` (and `SplitReader` respectively) that does not implement the new method `pauseOrResumeSplits` can be used as before the following way:
- The source does not apply watermark alignment, i.e., without `withWatermarkAlignment`.
- The source applies watermark alignment, i.e., with `withWatermarkAlignment`, but has only a single split assigned.
- The source applies watermark alignment and has multiple splits assigned and `pipeline.watermark-alignment.allow-unaligned-source-splits` (default: false) is set to true.
Conversely, if a source applies watermark alignment and has multiple splits assigned, default implementations of `pauseOrResumeSplits`throw `UnsupportedOperationException`.
The configuration parameter `pipeline.watermark-alignment.allow-unaligned-source-splits` (default: false) is added to easen the migration and simply suppresses the `UnsupportedOperationException`.
Example: If `pipeline.watermark-alignment.allow-unaligned-source-splits` is set to true, the following behavior can be observed:
- If a `SourceReader` does not implement `pauseOrResumeSplits` (regardless if assigned `SplitReader`s implement `pauseOrResumeSplits` or not), it will ignore any attempt for split alignment.
- If a `SplitReader` does not implement `pauseOrResumeSplits`, it will ignore any attempt for split alignment while other `SplitReader`s of the same `SourceReader` will pause or resume with respect to the alignment attempt.
Note: In future releases, the default implementations of the `pauseOrResumeSplits` of `SourceReader` and `SplitReader` as well as configuration parameter `pipeline.watermark-alignment.allow-unaligned-source-splits` may be removed such that implementation of `pauseOrResumeSplits` methods is mandatory. Therefore, it is recommended to override the default implementation of `pauseOrResumeSplits` methods for both, `SourceReader` and `SplitReader`.
The following source implementations have been adapted and implement `pauseOrResumeSplits` methods:
- Kafka
- Pulsar
Test Plan
The proposed changes will be unit and integration tested.
Rejected Alternatives
None.