Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, we have two different flavours of Watermark Assigners: AssignerWithPunctuatedWatermarks and AssignerWithPeriodicWatermarks. Both of them extend from TimestampAssigner. This means that sources that want to support watermark assignment/extraction in the source need to support two separate interfaces, we have two operator implementations for the different flavours. Also, this makes features such as generic support for idleness detection more complicated to implemented because we again have to support two types of watermark assigners.
In this FLIP we propose two things:
- Unify the Watermark Assigners into one Interface WatermarkGenerator
- Separate this new interface from the TimestampAssigner
The motivation for the first is to simplify future implementations and code duplication. The motivation for the second point is again code deduplication, most assigners currently have to extend from some base timestamp extractor or duplicate the extraction logic, or users have to override an abstract method of the watermark assigner to provide the timestamp extraction logic.
Additionally, we propose to add a generic wrapping WatermarkGenerator that provides idleness detection, i.e. it can mark a stream/partition as idle if no data arrives after a configured timeout.
The "unify and separate" part refers to the fact that we want to unify punctuated and periodic assigners but at the same time split the timestamp assigner from the watermark generator.
Finally, we want to add these new interfaces to flink-core to the existing org.apache.flink.api.common.eventtime package.
We are proposing the new TimestampAssigner and WatermarkGenerator interfaces as the basic new interfaces of this abstraction. On top of this, we're proposing WatermarkStrategy, which combines an assigner and a generator and is used in APIs and the plumbing to carry both of them together. We're also proposing the convenience class WatermarkStrategies for creating common strategies with default timestamp assigners.
We propose to add the new interfaces, along with wrapping code for the old interfaces to ensure compatibility. We need a new operator implementation for watermark extraction as well as adapt the KafkaConsumer to use the new WatermarkGenerator.
We will also need to add API methods to DataStream to allow users to specify the new interfaces for watermark generation.
The old extraction operators can be removed, we will only need the new extraction operator along with the wrapper WatermarkGenerator.
Compatibility, Deprecation, and Migration Plan
Existing user code will still work because we provide wrappers under the hood. The existing API methods will stay but we deprecate them and recommend using the new interfaces. The old interfaces should be deprecated in Flink 2.0.
Unit tests will be added for the new WatermarkGenerator implementations as well as generators. All existing ITCases and end-to-end tests will also test the new extraction logic because we want to remove the old extraction code in favour of the wrapping WatermarkGenerators.
We currently don't see alternatives, unifying the interfaces seems straightforward and will simplify implementation.