Status
Current state: Accepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The stream module of Kafka has a window state store that stores the aggregated values for a key in a given time frame. The window store is implemented as an interface, this interface has a strange method named put(key, value), this method has does not have a timestamp as a parameter which is important to determine that to which window frame does the key belongs. In this method, the current record timestamp is used for determining the window frame(as specified in the description of the method), this constraint makes WindowStore error prone. It is also specified in the method description that method with a timestamp parameter should be used which already present in the interface which expects key, value, and start timestamp as well of the window to which the key belongs. Therefore by deprecating (and finally removing) the method put(key, value), we can prevent inconsistency.
Public Interfaces
Following is the method to be deprecated in the WindowStore interface.
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> { /** * Use the current record timestamp as the {@code windowStartTimestamp} and * delegate to {@link WindowStore#put(Object, Object, long)}. * * It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp * is unlikely to be the correct windowStartTimestamp in general. * * @param key The key to associate the value to * @param value The value to update, it can be null; * if the serialized bytes are also null it is interpreted as deletes * @throws NullPointerException if the given key is {@code null} */ void put(K key, V value); }
Proposed Changes
We propose to deprecate the method WindowStore#put(key, value), as it has no timestamp specified in it, the timestamp is identified based on the current record and therefore does not satisfy the behavior of the window store. The timestamp is required as it is used to identify the window frame to which the key belongs.
Compatibility, Deprecation, and Migration Plan
Following are the list of the classes that implement this interface so also are needed to be updated.
Also, there are tests which are needed to be updated after deprecation of the specified method. Following are the list of test classes which are needed to update.
- WindowStoreFacadeTest.java
- WindowBytesStoreTest.java
- SimpleBenchmark.java (benchmarking)
- RocksDBWindowStoreTest.java
- ProcessorContextImplTest.java
- MeteredWindowStoreTest.java
- InMemoryWindowStoreTest.java
- ChangeLoggingWindowBytesStoreTest.java
- CachingWindowStoreTest.java
- ChangeLoggingTimestampedWindowBytesStoreTest.java
Rejected Alternatives
As here we are proposing to deprecate a method because the method does not satisfy the behaviour of the interface, the alternative would be to:-
- Update the method, so that the correct timestamp associated with the key can be accessed
- Since this API can be called by the user, updating the method can break the code. By this reason, this approach is not feasible.