Current state: Under Discussion

Discussion thread: Thread

JIRA KAFKA-14412 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

The changes outlined in this KIP were originally part of KIP-892: Transactional Semantics for StateStores, but were broken out into a separate KIP to focus discussion and simplify contribution/review.


Kafka Streams currently tracks the changelog offsets that correspond to local state in per-Task .checkpoint files, that are maintained by the internal Streams engine, independently of the StateStore implementation being used. Allowing StateStores to instead manage their changelog offsets themselves enables them to optimise the storage and management of both the changelog offsets and the corresponding data.

When using RocksDBStores, users generally expect to configure RocksDB behaviour through the RocksDBConfigSetter interface. This includes the behaviour of memtable flushes, which has a major impact on performance of RocksDB, so tuning it can have a considerable impact, especially on large databases. Under at-least-once (aka. ALOS), Kafka Streams forcibly flushes memtables on Task commit, provided at least 10,000 records have been processed. There's no way to configure this behaviour, and it's completely opaque to users that would expect flushes to be dictated by their RocksDB configuration.

Under EOS, we don't currently force-flush memtables on-commit. However, KIP-892 will require us to sync the data on-disk with the .checkpoint offsets. Doing this by flushing either on every commit, or every 10,000 records (like we do under ALOS) would be a performance regression from the current behaviour.

Consequently, this KIP is a hard-dependency of KIP-892, as transactional behaviour requires that we can atomically sync the changelog offsets with their corresponding state, which is not possible while those offsets are tracked in the separate .checkpoint file.

Public Interfaces

Changed Interfaces

  • org.apache.kafka.streams.processor.StateStore


     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * @return Whether this StateStore manages its own offsets.
    default boolean managesOffsets() {
        return false;

     * Flush any cached data
     * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
    default void flush() {
        // no-op

     * Commit all written records to this StateStore.
     * <p>
     * This method <b>MUST NOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
     * processors}, as doing so may violate the consistency guarantees provided by this store, and expected by Kafka
     * Streams.
     * <p>
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * <p>
     * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be 
     * persisted to disk along with the written records.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * records they represent, if possible.
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {

     * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
    default Long committedOffset(final TopicPartition partition) {
        return null;



Each added metric will be on store-level and have the following tags:

  • type = stream-state-metrics
  • thread-id = [thread ID]
  • task-id = [task ID]
  • [store type]-state-id = [store ID]    for key-value stores
  • [store type]-session-state-id = [store ID]    for session stores
  • [store type]-window-state-id = [store ID]    for window stores  
NameRecordingLevelMetric TypeDescription
commit-rateDEBUGRateThe number of calls to StateStore#commit(Map).
commit-latency-avgDEBUGAvgThe average time taken to call StateStore#commit(Map).
commit-latency-maxDEBUGMaxThe maximum time taken to call StateStore#commit(Map).


  • stream-state-metrics 
    • flush-rate
    • flush-latency-avg 
    • flush-latency-max 

These changes are necessary to ensure these metrics are not confused with orthogonal operations, like RocksDB memtable flushes or cache flushes. They will be measuring the invocation of StateStore#commit, which replaces StateStore#flush.

While the flush metrics are only deprecated, they will no longer record any data under normal use, as Kafka Streams will no longer call StateStore#flush().

Proposed Changes

3 new methods will be added to StateStore, each defined with a default implementation that ensures existing custom StateStores continue to function as they do today, to guarantee compatibility. All internal StateStore implementations will be updated with implementations of each of these methods that best suits that implementation.

For persistent stores (i.e. StateStore#persistent()  returns true) that do not manage their own offsets (i.e. StateStore#managedOffsets() returns false), the existing checkpointing behaviour will be provided internally, by continuing to store checkpoint offsets in the .checkpoint file. This will ensure compatibility for custom stores that have not been upgraded to manage their own offsets yet.

StateStore Implementations


RocksDBStore will manage its own offsets (i.e. managesOffsets  will return true), by storing changelog offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk, instead of coupling it to Task commit.

Interactive Query .position Offsets

Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position file directly by the RocksDBStore implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file. When a StateStore that manages its own offsets is first initialized, if a .position file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.

When writing data to a RocksDBStore (via put, delete, etc.), the input partition offsets will be read from the changelog record metadata (as before), and written to the offsets column family.


Since InMemoryKeyValueStore  is not persistent, there is no state to synchronise the changelog offsets with on-commit. Therefore, InMemoryKeyValueStore  will not manage offsets (i.e. managesOffsets will return false), and therefore, no changes to its implementation is required.

Custom Persistent Stores

Any existing, persistent StateStore that has not been updated to manage its own offsets, will inherit the default implementation of all three new methods. This will indicate that Kafka Streams should continue to checkpoint its offsets using the legacy .checkpoint  file. Maintainers of custom, persistent stores may add offset management after upgrading.

Consumer Rebalance Metadata

When determining the partition assignment, StreamsPartitionsAssignor considers the checkpoint offsets on-disk for all StateStores on each instance, even for Tasks they have not (yet) been assigned. This is done via TaskManager#getTaskOffsetSums(), which directly reads from the per-Task .checkpoint file.

With our new API, we need to continue to ensure that this file contains the offsets of stores that are closed, or have not yet been initialized. We will do this under EOS by updating the .checkpoint file whenever a store is close()d. Note: stores that do not manage their own offsets will continue to also update this file on-commit (provided at least 10,000 records have been written).

Compatibility, Deprecation, and Migration Plan

Kafka Streams will automatically migrate offsets found in an existing .checkpoint file, and/or an existing .position file, to store those offsets directly in the StateStore, if managesOffsets returns true. Users of the in-built store types will not need to make any changes. See Upgrading.

Flush deprecation

All internal usage of the StateStore#flush method has been removed/replaced. Therefore, the main concern is with end-users calling StateStore#flush directly from custom Processors. Obviously, users cannot safely call StateStore#commit, because they do not know the changelog offsets that correspond to the locally written state. Forcibly flushing/fsyncing recently written records to disk may also violate any transactional guarantees that StateStores are providing. Therefore, the default implementation of flush has been updated to a no-op. Users are now advised (via JavaDoc) that should instead request an early commit via. ProcessingContext#commit().

It's highly unlikely that users are depending on the flush method forcibly persisting state to disk, as it does today, because Kafka Streams guarantees that before the Processor is initialized, its task state will correspond with its changelog. Calling flush will also not impact restore performance in the event of an error, because under EOS, all task state will be wiped regardless, and under ALOS task state will be rebuilt from the last committed changelog offset irrespective of what state is already on disk.

Therefore, it's likely that most/all user invocations of flush today has no effect, other than to potentially cause performance problems (e.g. by forcing RocksDB to flush many small memtables prematurely).


When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action.

The default implementation of all three methods has been designed to ensure backwards-compatibility for any custom store that does not yet support these methods.

Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint and/or .position files.


When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. The older Kafka Streams version will be unable to open any RocksDB stores that were upgraded to store offsets (see Upgrading), which will cause Kafka Streams to wipe the state for those Tasks and restore the state, using an older RocksDB store format, from the changelogs.

Since downgrading is a low frequency event, and since restoring state from scratch is already an existing failure mode for older versions of Kafka Streams, we deem this an acceptable automatic downgrade strategy.

Test Plan

The existing tests that depend on .checkpoint files will either be removed (if they are no longer relevant), or modified to instead call StateStore#committedOffset().

Much of the existing behaviour, and all new behaviour, will be provided by specific StateStore implementations/wrappers, instead of implemented directly in the Streams engine. This should simplify testing, as much more of this behaviour can be tested with unit tests, with much less mocking required.

Rejected Alternatives

Periodic forced flush

We originally attempted to deliver KIP-892 without these changes, but ran into a major performance regression, caused by forcibly flush()ing RocksDBStores on every commit. Under EOS, we don't currently flush-on-commit, so changing the behaviour to flush on-commit (even after an interval) can cause performance problems, depending on the store.

An alternative approach was suggested that we force-flush, not on every commit, but only on some commits, like we do under ALOS. This would be configured by a new configuration, for example  and/or statestore.flush.interval.records. However, any default that we chose for these configurations would be arbitrary, and could result, under EOS, in more flushing than we do today. For some users, the defaults would be fine, and would likely have no impact. But for others, the defaults could be ill-suited to their workload, and could cause a performance regression on-upgrade.

We instead chose to take the opportunity to solve this with a more comprehensive set of changes to the StateStore API , that should have additional benefits.

StreamsPartitionsAssignor to query stores for offsets

Instead of having TaskManager#getOffsetSums()  read from the .checkpoint  file directly, we originally intended to have it call StateStore#committedOffset() on each store, and make the StateStore responsible for tracking the offset for stores even when not initialized.

However, this is not possible, because a StateStore  does not know its TaskId , and hence cannot determine its on-disk StateDirectory  until after StateStore#init() has been called. We could have added a StateStoreContext  argument to committedOffset() , but we decided against it, because doing so would make the API less clear, and correctly implementing this method would be more difficult for StateStore implementation maintainers.

Instead, we revert to the existing behaviour, and have the Streams engine continue to maintain the offsets for closed stores in the .checkpoint file.

  • No labels