Current state: Accepted
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The state stores used by Kafka Streams today maintain only the latest value associated with each key. This prevents Kafka Streams from providing proper temporal join semantics for stream-tables joins.
Consider the following example of A join B, where A is a stream and B is a table, and records (for a particular key) arrive in the following order:
Note that the last record (a2) is out-of-order by timestamp (relative to the other messages in stream A). The first two join result records are as expected: the a1 record is joined against b0, and the a4 record is joined against b3. However, when the out-of-order a2 record arrives, it should be joined against the b0 record since that's the latest record on the B-side as of its record timestamp
time=2 but this is not what happens. The state store for table B has already been updated to contain b3 and the b0 record was replaced in the process. As a result, the only record available in the store for the a2 record to join against is the b3 record, which is why Kafka Streams outputs the join result (a2, b3) today.
For an example stream-table join application where proper temporal join semantics is critical, imagine that B is table of real-time currency conversion rates and A is a stream of transactions. It's important that each transaction in A is joined with the conversion rate at the time that the transaction was issued, not the latest conversion rate seen so far.
A similar temporal semantic gap exists with table-table foreign key joins today as well. The subscription store may not reflect an accurate set of foreign key records at the time of the join record's timestamp, resulting in incorrect join results.
To address this class of gaps in Kafka Streams stateful processing semantics, this KIP proposes to introduce versioned state stores. A versioned state store will track multiple record versions for the same key, rather than the single latest record per key as is the case for existing stores today.
Introducing versioned stores covers a large surface area: the store interfaces, store implementations, updating DSL processors to use them, interactive queries, metrics/monitoring, etc. The scope of this first KIP is limited to:
- defining the basic interface for versioned stores
- introducing a RocksDB-based implementation for the interface
- additional methods necessary for allowing users to pass versioned stores to the DSL (via StreamsBuilder and Materialized) and PAPI.
Everything else will be deferred to follow-up KIPs.
The interface changes proposed in this KIP are:
- a new interface for versioned stores, and a helper class:
VersionedKeyValueStore<K, V> extends StateStore, with helper
- a new interface for versioned store suppliers, and a helper interface:
VersionedBytesStoreSupplier extends KeyValueBytesStoreSupplier, with helper
- three new methods in Stores.java:
- two for creating a persistent, versioned store supplier:
Stores#persistentVersionedKeyValueStore(...)plus an overload
- another for creating a StoreBuilder from a versioned supplier:
- two for creating a persistent, versioned store supplier:
- a new method in TopologyTestDriver.java for getting a versioned store from a topology
- a new static method in ValueAndTimestamp.java for creating ValueAndTimestamp instances where the value may be null:
Versioned Store Interface
The new interface for versioned stores is as follows:
Note that this proposal intentionally omits most methods from the existing KeyValueStore interface in order to keep the new interface simple. It could be nice to add additional methods in the future, such as
rangeKey() methods to enable the foreign-key join subscription store use case, but this is deferred to a future KIP in order to align on these basic interfaces first.
The VersionedRecord return type from the
get() methods is essentially the same as the existing ValueAndTimestamp class today, but is its own separate class so that we can evolve it in the future. For example, we may want to add an additional timestamp to the VersionedRecord class to represent the expiry time of the record version (i.e., the timestamp of the next record version for this key) in addition to the existing timestamp.
Store Supplier/Builder Interfaces
The new Stores.java methods are as follows:
To understand the history retention and segment interval parameters for the
persistentVersionedKeyValueStore() methods requires brief discussion of the planned RocksDB implementation for versioned stores.
RocksDB Implementation Overview
Here's a high-level overview of the RocksDB versioned store implementation (details are outside the scope of this KIP).
Each store has an associated, fixed-duration history retention which specifies how long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for calls to
get(key, asOfTimestamp) where the provided timestamp bound is within history retention of the current observed stream time. (If the timestamp bound is outside the specified history retention, then a record is still returned if the latest record version for the key satisfies the timestamp bound. Otherwise, a warning is logged and null is returned.)
To achieve this, the store will consist of a "latest value store" and "segment stores." The latest record version for each key will be stored in the latest value store, while all older versions will be stored in the segment stores.
Each record version has two associated timestamps:
validFromtimestamp. This timestamp is explicitly associated with the record as part of the put() call to the store; i.e., this is the record's timestamp.
validTotimestamp. This is the timestamp of the next record (or deletion) associated with the same key, and is implicitly associated with the record. This timestamp can change as new records are inserted into the store.
The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and can change as new record versions are inserted into the store (and validTo changes as a result).
Old record versions are stored in segment stores according to their validTo timestamps. The use of segments here is analogous to that in the existing RocksDB implementation for windowed stores. Because records are stored in segments based on their validTo timestamps, this means that entire segments can be expired at a time once the records contained in the segment are no longer relevant based on the store's history retention. (A difference between the versioned store segments implementation and that of windowed stores today is that for versioned stores all segments will share the same physical RocksDB instance, in contrast to windowed stores where each segment is its own RocksDB, to allow for many more segments than windowed stores use today.)
The segment interval parameter for controlling segment size is (optionally) exposed to users in the static constructor methods above because benchmarking a prototype implementation showed that this parameter has significant effect on store performance based on workload characteristics.
Here's the VersionedBytesStoreSupplier interface used by the Stores.java methods above:
As mentioned in the Javadoc, the reason that this supplier extends
KeyValueBytesStoreSupplier and therefore returns a store of type
KeyValueStore<Bytes, byte> rather than a
VersionedKeyValueStore<Bytes, byte> (as the name suggests) is in order to be compatible with existing DSL methods for passing key-value stores, e.g., StreamsBuilder#table() and KTable methods, which are explicitly typed to accept
Materialized<K, V, KeyValueStore<Bytes, byte> . The alternative to fitting
KeyValueStore in this way is to introduce new versions of all relevant StreamsBuilder and KTable methods to relax the
Materialized type accepted by these methods. While this is possible, and we could even deprecate the existing methods in favor of the new ones introduced, this is a large surface area for public interface changes that it's best to avoid if possible.
The cost of fitting
KeyValueStore as proposed is two additional layers of translation, for both DSL and PAPI users, whenever put() or get() is called on a versioned store: the record being written or read must be converted between
(keyBytes, valueBytes, timestamp) and
(keyBytes, valueBytes + serializedTimestamp) and back. It also means that users who wish to create their own
VersionedKeyValueStore implementation (specifically, PAPI users who want to use the provided
Stores#versionedKeyValueStoreBuilder method, and DSL users) also need to mimic this translation layer from
KeyValueStore and back.
To alleviate this pain, we could expose an additional helper method for the conversion and/or add an additional method to
VersionedBytesStoreSupplier which directly returns a
VersionedKeyValueStore<Bytes, byte> if implemented. The latter allows us to save on the two additional layers of translation, at the expense of complicating one of the interfaces. Unless reviewers feel strongly about this (avoiding the extra translation and/or making it easier for users to create their own
VersionedKeyValueStore implementations), I propose to leave these options out for now and we can always revisit them later.
For completeness, here's the new VersionedBytesStore interface which VersionedBytesStoreSupplier instances will return. Unless a user chooses to implement their own VersionedBytesStoreSupplier (i.e., in order to implement a custom versioned store to pass to the DSL or to the new Stores#versionedKeyValueStoreBuilder() method), then users will not need to interact with this interface.
Internally, this interface will be used to assist in the representation of
VersionedKeyValueStore<Bytes, byte> as
KeyValueStore<Bytes, byte> .
Additional Interface Changes
TopologyTestDriver users should be able to get (and interact with) versioned stores from their topology, similar to the existing methods for other store types:
Another additional interface change needed as part of this proposal is to add the following static constructor to
The reason this addition is needed is an implementation detail. The existing DSL processor implementation represents all source table state stores as timestamped key-value stores (link) which means that, unless we want to lift this restriction and change a significant amount of code internally, then versioned key-value stores will have to fit the
TimestampedKeyValueStore interface internally.
TimestampedKeyValueStore represents inserting a new record to the store as calling
put(K key, ValueAndTimestamp<V> v). In order to allow inserting tombstones into versioned stores,
ValueAndTimestamp therefore needs to allow null values.
Even though this is a public interface change (by virtue of
ValueAndTimestamp being public), the usage of
ValueAndTimestamp instances with null values will be purely internal. In other words, this change is not strictly needed as a public interface change and could also be achieved through refactoring. That said, the cost of introducing this new method seems low and I'd like to propose it.
Compatibility, Deprecation, and Migration Plan
This KIP introduces a new type of store without deprecating any existing interfaces. Unless a user explicitly updates their application code to use the new store, this KIP will have no effect on their applications (versioned stores are not used anywhere by default).
The RocksDB format used for versioned stores is not compatible with the existing format for non-versioned stores.
However, RocksDB-based versioned and non-versioned stores will use the same changelog topic format, though their changelog topic configurations will differ. Specifically, the changelog bytes format for RocksDB-based versioned and non-versioned stores is the same, but changelog topics for versioned stores need
min.compaction.lag.ms set to a value suitable for the desired history retention of the versioned store. The RocksDB versioned store implementation will set
min.compaction.lag.ms equal to history retention plus 24 hours, where the purpose of this additional buffer is to account for the broker's usage of wall clock time in topic compactions (analogous to the extra 24 hours changelog retention for windowed stores today). Changelog topic configs will be set only on changelog topic creation, and will not be verified if the changelog topic already exists.
In light of the above, users can use the following manual procedure to update an existing application with a non-versioned store to use a versioned store instead:
- Stop the application
- Delete all local state (for the store being updated) from all instances
- Update the changelog topic configurations to set
min.compaction.lag.msto a value suitable for the desired history retention (e.g., history retention plus some buffer to account for broker wall clock time usage in topic cleanup)
- Update the application code to use a versioned store
- Restart the app.
There are no plans to support a non-manual upgrade procedure or a live migration procedure at this time. In the future, it could be nice to make versioned stores the default since a non-versioned store is simply a special case of a versioned store (with history retention 0) but that's far out of scope for this KIP.
The RocksDB-based versioned store implementation will be tested with the Processor API: put, get, and timestamp-based get methods will have their results validated.
The manual procedure described above for updating an application using a non-versioned store to use a versioned store will be tested as well.
Versioned Store Interface
History retention and get(key,
In the event that
get(key, asOfTimestamp) is called with a timestamp bound older than the specified history retention, instead of returning null (and logging a warning) as proposed above, other design options include (1) throwing an exception or (2) updating the return type from
Optional<VersionedRecord<V>> and returning an empty optional to indicate that the timestamp bound was invalid. The first option is not very user-friendly. The second option complicates the interface and diverges the return types of
get(key, asOfTimestamp) .
Regarding the edge case where
get(key, asOfTimestamp) is called with a timestamp bound older than the specified history retention but the latest record version for the key satisfies the timestamp bound, the proposal above says that the latest record version should be returned in this case, rather than rejecting the timestamped query and returning null. Returning the record is preferable because its existence (as the latest value for the key) is guaranteed in the store, and is accessible from
get(key) anyway. The alternative of returning null, i.e., strict enforcement of the store's history retention, is not very user-friendly as users would then have to determine whether to call
get(key, timestamp) to account for this edge case.
ValueAndTimestamp as return type of get(key, asOfTimestamp) / Additional return timestamps from get(key,
The proposed return type from
get(key, asOfTimestamp) of
VersionedRecord<V> returns the record value and timestamp (i.e., validFrom timestamp) found for the given key (and timestamp bound). In some situations, it may be useful for users to additionally have the validTo timestamp associated with the record. In order to allow for this possibility in the future, the return type of
get(key, asOfTimestamp) is a new type,
VersionedRecord , rather than the existing
ValueAndTimestamp<V> type, even though the two are largely the same today. We considered keeping the interface simple by not introducing a new type, but felt that the flexibility of evolving this type in the future was worth the addition of a new class. However, we will not add additional return timestamps at this time. They can be added once we have more confidence that they will be useful for users.
Return null with timestamp from get()
In the event that
get(key, asOfTimestamp) finds that the latest record version associated with a particular key (and possible timestamp bound) is a tombstone, rather than returning null the versioned store could instead return a non-null
VersionedRecord with null value (and relevant timestamp). This would allow users to distinguish between the key not being found in the store at all (null
VersionedRecord ) versus the key being found with a tombstone for the latest record (non-null
VersionedRecord with null value). This proposal was rejected since the use cases for making such a distinction are limited.
Grace period separately configurable from history retention
"History retention" and "grace period" control how far back in time (relative to the current observed stream time) old reads and writes, respectively, will be accepted by the store. In the proposal above, users specify a single value which is used for both parameters, though in the future we could add an additional option for users to specify the two separately. (Today, users specify an explicit value for history retention, and grace period is automatically set to the same value. There are no compatibility concerns with introducing a new option for grace period in the future.)
Support for Upgrades
Additional support for upgrading a non-versioned store to a versioned store beyond the manual steps above were rejected on the basis of complexity. Automatic upgrades are too complex, and it's not clear that additional tooling for manual upgrades would be valuable to users at this time. It's better to get the new versioned interfaces out sooner in order to let them bake/iterate, rather than block on additional complexity for introducing the first version at this time.
Versioned Windowed Stores
This KIP only proposes to introduce versioned stores for key-value stores and not windowed (or session) stores since use cases for versioned windowed stores are limited. By not introducing versioned windowed stores, there is also the potential to unify the underlying implementations of versioned stores and windowed stores in the future by leveraging the shared validFrom/validTo timestamp abstraction. Any such unification is a long way off and not in scope for this KIP.
As noted above, there are a number of additional features needed to complete the story around versioned stores. While these are important, they are all deferred to follow-up KIPs. Example features include providing an in-memory implementation of the versioned store interface, supporting interactive queries, new metrics/monitoring specific to versioned stores, updating DSL processors to use versioned stores (if provided) to address gaps in join semantics noted in the motivation section, and potentially adding additional methods to the versioned store interface.