Table of Contents


Code Block
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)},.
 even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highlyrequired,
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
 expects when operating
  * expects when operating* under thean {@code exactly-once} {@code processing.modeguarantee}.
     * <p>
     * @deprecated New implementations shouldare alwaysrequired returnto {@code true}implement this method and managereturn their own offsets{@code true}. In the future,Existing implementations
     * should upgrade to managing their own offsets as soon as possible,  this method will be removed and itas the legacy offset management is deprecated
     * and will be assumedremoved toin alwaysa return {@code true}future version.
     * @return
 Whether this StateStore manages its* own@deprecated offsets.
New implementations should always return */
    default boolean managesOffsets() {{@code true} and manage their own offsets. In the future,
     *   return false;

 this method will be *removed Flushand anyit cachedwill data
be assumed to always return *{@code true}.
     * @deprecated@return UseWhether {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *   this StateStore manages its own offsets.
    default  instead.
    default void flush(boolean managesOffsets() {
        // no-opreturn false;

     * CommitFlush allany written records to this StateStore.cached data
     * <p>
     * This@deprecated methodUse <b>CANNOT<b> be called by users from {@link org.apache.{@link org.apache.kafka.streams.processor.api.ProcessorProcessingContext#commit() ProcessorContext#commit()}
     * processors}. Doing so will throw an {@link java.lang.UnsupportedOperationException}             instead.
       * <p>/
 Instead, users should calldefault {@link org.apache.kafka.streams.processor.api.ProcessingContext#commitvoid flush() {
      * ProcessorContext#commit()} to request a Task commit.  // no-op

     /** <p>
     * IfCommit {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be 
     * persisted to disk along with the written records.all written records to this StateStore.
     * <p>
     * This method <b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
     * <p>
processors}. Doing so will throw *an {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided@link java.lang.UnsupportedOperationException}.
     * <p>
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * partitions <em>MUST</em> be persisted to disk.<p>
     * <p>
If {@link #managesOffsets()} returns {@code *true}, Implementations <em>SHOULD</em> ensure thatthe given {@code changelogOffsets} will arebe committedguaranteed to diskbe atomically
 with the
   * persisted *to disk recordsalong theywith represent,the ifwritten possiblerecords.
     * <p>
     * @param{@code changelogOffsets} will usually Thecontain changelog offset(s) corresponding to the most recently written records.a single partition, in the case of a regular StateStore. However,
 they may contain defaultmultiple void commit(final Map<TopicPartition, Long> changelogOffsets) {partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions  flush();

<em>MUST</em> be persisted to disk.
     /** <p>
     * ReturnsImplementations the<em>SHOULD</em> mostensure recentlythat {@link #commit(Map) committed} offset for the given {@link TopicPartition}@code changelogOffsets} are committed to disk atomically with the
     * records they represent, if possible.
     * <p>
     * If@param {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return thechangelogOffsets The changelog offset(s) corresponding to the most recently written records.
 offset  that correspondsdefault tovoid the changelog record most recently written to this store, for the given {@code
     * partition}.commit(final Map<TopicPartition, Long> changelogOffsets) {

     * @paramReturns partitionthe Themost partition to get the committed offset for.
     * @return The last recently {@link #commit(Map) committed} offset for the given {@code@link partitionTopicPartition};.
 or {@code null} if no* offset<p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.{@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     */ partition}.
    default Long committedOffset(final TopicPartition partition) { * 
     * @param partition The partition to get the committed offset for.
     * @return The last {@link  return null;
#commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
    default Long committedOffset(final TopicPartition partition) {
        return null;



Each added metric will be on store-level and have the following tags:


NameRecordingLevelMetric TypeDescription
commit-rateDEBUGRateThe average number of calls to StateStore#commit(Map).commit per second
commit-latency-avgDEBUGAvgThe average time taken to call StateStore#commit(Map).latency of calls to commit
commit-latency-maxDEBUGMaxThe maximum time taken to call StateStore#commit(Map).latency of calls to commit


  • stream-state-metrics 
    • flush-rate
    • flush-latency-avg 
    • flush-latency-max 


Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position file directly by the RocksDBStore implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file. When a StateStore that manages its own offsets is first initialized, if a .position file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.

When writing data to a RocksDBStore (via put, delete, etc.), the input partition offsets will be automatically migrated into the store, and the file will be deleted.

When writing data to a RocksDBStore (via put, delete, etc.), the input partition offsets will be read from the changelog record metadata (as before), and written to the offsets column family.


Since InMemoryKeyValueStore  is not persistent, there is no state to synchronise the changelog offsets with on-commit. Therefore, InMemoryKeyValueStore  will not manage offsets (i.e. managesOffsets will return false), and therefore, no changes to its implementation is required.

Custom Persistent Stores

Any existing, persistent StateStore that has not been updated to manage its own offsets, will inherit the default implementation of all three new methods. This will indicate that Kafka Streams should continue to checkpoint its offsets using the legacy .checkpoint  file. Maintainers of custom, persistent stores may add offset management after upgrading.

Consumer Rebalance Metadata

When determining the partition assignment, StreamsPartitionsAssignor considers the checkpoint offsets on-disk for all StateStores on each instance, even for Tasks they have not (yet) been assigned. This is done via TaskManager#getTaskOffsetSums(), which directly reads from the per-Task .checkpoint file.


Compatibility, Deprecation, and Migration Plan


The newly introduced StateStore#managesOffsets method will be immediately deprecated, to allow for its removal in the next major release of Kafka Streamsa future release. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN level log message when initializing any StateStore that is persistent, but does not manage its own offsets.


Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint and/or .position files.


When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users RocksDBStore will not be required to take any action. The older Kafka Streams version will be unable to open any RocksDB stores that were upgraded to store offsets (see Upgrading), which will cause Kafka Streams to wipe the state for those Tasks and restore the state, using an older RocksDB store format, from the changelogs.

Since downgrading is a low frequency event, and since restoring state from scratch is already an existing failure mode for older versions of Kafka Streams, we deem this an acceptable automatic downgrade strategy.

Test Plan

The existing tests that depend on .checkpoint files will either be removed (if they are no longer relevant), or modified to instead call StateStore#committedOffset().

Much of the existing behaviour, and all new behaviour, will be provided by specific StateStore implementations/wrappers, instead of implemented directly in the Streams engine. This should simplify testing, as much more of this behaviour can be tested with unit tests, with much less mocking required.

Rejected Alternatives

Periodic forced flush

We originally attempted to deliver KIP-892 without these changes, but ran into a major performance regression, caused by forcibly flush()ing RocksDBStores on every commit. Under EOS, we don't currently flush-on-commit, so changing the behaviour to flush on-commit (even after an interval) can cause performance problems, depending on the store.

An alternative approach was suggested that we force-flush, not on every commit, but only on some commits, like we do under ALOS. This would be configured by a new configuration, for example  and/or statestore.flush.interval.records. However, any default that we chose for these configurations would be arbitrary, and could result, under EOS, in more flushing than we do today. For some users, the defaults would be fine, and would likely have no impact. But for others, the defaults could be ill-suited to their workload, and could cause a performance regression on-upgrade.

We instead chose to take the opportunity to solve this with a more comprehensive set of changes to the StateStore API , that should have additional benefits.

StreamsPartitionsAssignor to query stores for offsets

Instead of having TaskManager#getOffsetSums()  read from the .checkpoint  file directly, we originally intended to have it call StateStore#committedOffset() on each store, and make the StateStore responsible for tracking the offset for stores even when not initialized.

However, this is not possible, because a StateStore  does not know its TaskId , and hence cannot determine its on-disk StateDirectory  until after StateStore#init() has been called. We could have added a StateStoreContext  argument to committedOffset() , but we decided against it, because doing so would make the API less clear, and correctly implementing this method would be more difficult for StateStore implementation maintainers.

compatible with older versions of Kafka Streams, due to the extra offsets column family that older versions of Kafka Streams do not recognize. If a user downgrades from a version containing this KIP, to a version without it, by default the on-disk state for any Task containing a RocksDBStore will be wiped and restored from their changelogs.

However, we will support using the upgrade.from config to safely downgrade without losing local RocksDB state. When upgrade.from  is set to a version less than the version that introduced this KIP:

  • On RocksDBStore#close:
    • All changelogs stored in the RocksDB offsets column family will be written to the Task .checkpoint file.
    • All source topic offsets stored in the RocksDB offsets column family will be written to the store .position file.
    • The RocksDB offsets column family will be deleted from the RocksDB database, ensuring that older versions of Kafka Streams will be able to open the database successfully.

