Current state: Accepted
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
In order to provide stronger consistency across replicated state stores (e.g., Read-Your-Writes consistency) it's evident for state stores to collect metadata (e.g., offset information) of all the records that have been written to the store.
State stores access processor context via the StateStoreContext interface. Today, the AbstractProcessorContext class which implements/extends StateStoreContext already makes record metadata available via recordMetadata(). Unfortunately that method is not part of the StateStoreContext interface, thus, state stores cannot access it.
In this KIP we propose to add recordMetada() via the StateStoreContext. The following interface will change:
- org.apache.kafka.stream.processor.StateStoreContext - adding method recordMetadata()
The code segment below shows the actual code change together with the corresponding JavaDoc:
Note that recordMetadata() returns an Optional which accounts for the fact that there might not be any recordMetadata available if no records have been processed yet.
The following code snippet illustrates how the above interface can be used within the state store to keep track of the latest offset a store has processed:
Compatibility, Deprecation, and Migration Plan
The change of the StateStoreContext interface does not require any changes to any of the implementations of the interface, it merely exposes a method (recordMetadata) that is already implemented in AbstractProcessorContext. Therefore, we do not expect any compatibility issues. Moreover, recordMetadata returns an object of type RecordMetadata which is read-only, thus, protecting Kafka-internals from being tampered by applications.
Add a new Put method to the KeyValueStore interface
The idea would be to couple data and metadata together in a single Put call.
We rejected this approach because of the extra code complexity it causes. Each processor would have to explicitly manage which call (
put(key, value) or
put(key, value, metadata)) to invoke, depending on whether it's running in process or in punctuate. Then, the caching layer would also need to select which put call to call on the lower stores, based on whether the cached metadata is present or not.
Add a new UpdatePosition method to the KeyValueStore interface
We rejected this approach because of an increased risk for bugs. For instance, if a processor forgets to call update after a put it will be impossible for a state store to reason about consistency. Second, decoupling the data and the metadata update is also problematic because it creates a window of inconsistency where the data is already reflected by the store but the metadata is not.