Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: Thread

JIRA:  

Jira
columns
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-1441217411

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    
    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)},.
 even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highlyrequired,
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
 expects when operating
  * expects when operating* under thean {@code exactly-once} {@code processing.modeguarantee}.
     * <p>
     * @returnNew Whetherimplementations thisare StateStorerequired managesto implement itsthis ownmethod offsets.
and return {@code true}. Existing */implementations
    default boolean* managesOffsets() {
        return false;
    }

    /**should upgrade to managing their own offsets as soon as possible, as the legacy offset management is deprecated
     * Flush any cached dataand will be removed in a future version.
     * 
     * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()} New implementations should always return {@code true} and manage their own offsets. In the future,
     *             instead.
this  method  will */
be removed and it @Deprecated
will be assumed to defaultalways void flush() {return {@code true}.
     * @return Whether // no-op
    }

this StateStore manages its own offsets.
     */**
    @Deprecated
 * Commit all written records to this StateStore.default boolean managesOffsets() {
        return false;
    }

    /**
     * <p> Flush any cached data
     * This
 method <b>MUST NOT<b> be called* by@deprecated usersUse from {@link org.apache.kafka.streams.processor.api.Processor
 ProcessingContext#commit() ProcessorContext#commit()}
     * processors}, as doing so may violate the consistency guarantees provided by this store, and expected by Kafkainstead.
     */
     * Streams.
@Deprecated
    default void *flush() <p>{
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() // no-op
    }

    /**
     * ProcessorContext#commit()} to request a Task commit Commit all written records to this StateStore.
     * <p>
     * IfThis {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be method <b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
     * persistedprocessors}. toDoing diskso alongwill withthrow thean written records{@link java.lang.UnsupportedOperationException}.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * they<p>
 may contain multiple partitions in* theIf case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be{@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be 
     * persisted to disk along with the written records.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} arewill committedusually contain toa disksingle atomicallypartition, within the
 case of a regular * records they represent, if possible.StateStore. However,
     * 
they may contain multiple partitions *in @paramthe changelogOffsetscase Theof changelog offset(s) corresponding to the most recently written records.a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</
em> be persisted to defaultdisk.
 void commit(final Map<TopicPartition, Long> changelogOffsets)* {<p>
     * Implementations  flush();
    }

    /**<em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * Returnsrecords thethey mostrepresent, recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}if possible.
     * <p>
     * @param changelogOffsets IfThe {@linkchangelog #managesOffsetsoffset(s)} andcorresponding {@link #persistent()} both return {@code true}, this method will return theto the most recently written records.
     */
    default *void offset that corresponds to the changelog record most recently written to this store, for the given {@codecommit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

     * partition}./**
     * <p>
Returns the most recently  * This method <i>MUST</i> return the correct{@link #commit(Map) committed} offset even iffor the store is not currently given {@link #isOpen() openTopicPartition}.
     * <p>
     * @paramIf partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}{@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * 
     * @param partition The partition to get the committed return {@code false}offset for.
     */
 @return The  default Long committedOffset(final TopicPartition partition) {
last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         returnhas null;
been committed for  }

Metrics

New

...

the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long committedOffset(final TopicPartition partition) {
        return null;
    }

Metrics

New

Each added metric will be on store-level and have the following tags:

  • type = stream-state-metrics
  • thread-id = [thread ID]
  • task-id = [task ID]
  • [store type]-state-id = [store ID]    for key-value stores
  • [store type]-session-state-id = [store ID]    for session stores
  • [store type]-window-state-id = [store ID]    for window stores  
NameRecordingLevelMetric TypeDescription
commit-rateDEBUGRateThe average number of calls to commit per second
commit-latency-avgDEBUGAvgThe average latency of calls to commit
commit-latency-maxDEBUGMaxThe maximum latency of calls to commit

...


Deprecated

  • stream-state-metrics 
    • flush-rate
    • flush-latency-avg 
    • flush-latency-max 

...

3 new methods will be added to StateStore, each defined with a default implementation that ensures existing custom StateStores continue to function as they do today, to guarantee compatibility. All internal StateStore implementations will be updated with implementations of each of these methods that best suits that implementation..

For persistent stores (i.e. StateStore#persistent()  returns true) that do not manage their own offsets (i.e. StateStore#managedOffsets() returns false), the existing checkpointing behaviour A new, WrappedStateStore implementation will be provided internally, which will provide the legacy .checkpoint file offset management behaviour used today. When opening state stores, the StateManager  will automatically wrap the store with this legacy wrapper iff it's both persistent()  and does not managesOffsets()by continuing to store checkpoint offsets in the .checkpoint file. This will ensure that existing, compatibility for custom stores that have not been upgraded to support the new API will function as they do today, without polluting the StateManager with a large amount of legacy code, which would make it difficult to testmanage their own offsets yet.

Calling StateStore#commit from inside a Processor would be dangerous, because it would cause the StateStore to believe that processed records have been committed when they have not (yet). Therefore, we will prevent this method from being called in this context by implementing it in AbstractReadWriteDecorator , which wraps all stores provided to Processors, and throwing an UnsupportedOperationException.

StateStore Implementations

...

RocksDBStore will manage its own offsets (i.e. managesOffsets  will return true), by storing changelog offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk, instead of coupling it to Task commit.

To ensure that offsets can be returned by committedOffset, even with the store is closed (i.e. after close() or before init()), we will write out the latest value for all changelog offsets to a separate file on-disk in the close() method, which will be read from when isOpen() returns false. This file will be deleted during init, and the offsets will be read directly from the column family while the database is open.

Interactive Query .position Offsets

Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position file directly by the RocksDBStore implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file. When a StateStore that manages its own offsets is first initialized, if a .position file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.

When writing data to a RocksDBStore (via put, delete, etc.), the input partition offsets will be read from the changelog record metadata (as before), and written to the offsets column family.

InMemoryKeyValueStore

Since InMemoryKeyValueStore  is not persistent, there is no state to synchronise the changelog offsets with on-commit. Therefore, InMemoryKeyValueStore  will not manage offsets (i.e. managesOffsets will return false), and therefore, no changes to its implementation is required.

Custom Persistent Stores

Any existing, persistent StateStore that has not been updated to manage its own offsets, will inherit the default implementation of all three new methods. This will indicate that Kafka Streams should continue to checkpoint its offsets using the legacy .checkpoint  file. Maintainers of custom, persistent stores may add offset management after upgrading.

Consumer Rebalance Metadata

When determining the partition assignment, StreamsPartitionsAssignor considers the checkpoint offsets on-disk for all StateStores on each instance, even for Tasks they have not (yet) been assigned. This is done via TaskManager#getTaskOffsetSums(), which directly reads from the per-Task .checkpoint file.

...

Interactive Query .position Offsets

Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position file directly by the RocksDBStore implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file. When a StateStore that manages its own offsets is first initialized, if a .position file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.

When writing data to a RocksDBStore (via put, delete, etc.), the input partition offsets will be read from the changelog record metadata (as before), and written to the offsets column family.

InMemoryKeyValueStore

Since InMemoryKeyValueStore  is not persistent, there is no state to synchronise the changelog offsets with on-commit. Therefore, InMemoryKeyValueStore  will not manage offsets (i.e. managesOffsets will return false), and therefore, no changes to its implementation is required.

Custom Persistent Stores

Any existing, persistent StateStore that has not been updated to manage its own offsets, will inherit the default implementation of all three new methods. This will indicate that Kafka Streams should continue to checkpoint its offsets using the legacy .checkpoint  file. Maintainers of custom, persistent stores may add offset management after upgrading.

Consumer Rebalance Metadata

When determining the partition assignment, StreamsPartitionsAssignor considers the checkpoint offsets on-disk for all StateStores on each instance, even for Tasks they have not (yet) been assigned. This is done via TaskManager#getTaskOffsetSums(), which, currently, directly reads from the per-Task .checkpoint file.

Since stores will now be managing their offsets internally, we will need to read offsets from the stores themselves instead of the .checkpoint file.

  • On start-up, we will construct and initialize state stores for every corresponding Task store directory we find on-disk, and call committedOffset to determine their stored offset(s). We will then cache these offsets in-memory. This cache will be shared among all StreamThreads, and will therefore be thread-safe.
  • On rebalance, we will consult this cache to compute our Task offset lags.
  • After state restore completes for an assigned Task, we will update the offset cache to Task.LATEST_OFFSET, to indicate that the Task now has the currently latest offset.
  • When closing a StateStore, we will update the offset cache with the current changelog offset for the store.
    • This will ensure that when a Task is reassigned to another instance, the Task lag for the local state matches what's on-disk, instead of using the sentinel value Task.LATEST_OFFSET, which is only valid for Tasks currently assigned to a thread on the local instance.

Compatibility, Deprecation, and Migration Plan

Kafka Streams will automatically migrate offsets found in an existing .checkpoint file, and/or an existing .position file, to store those offsets directly in the StateStore, if managesOffsets returns true. Users of the in-built store types will not need to make any changes. See Upgrading.

...

flush deprecation

All internal usage of the StateStore#flush method has been removed/replaced. Therefore, the main concern is with end-users calling StateStore#flush directly from custom Processors. Obviously, users cannot safely call StateStore#commit, because they do not know the changelog offsets that correspond to the locally written state. Forcibly flushing/fsyncing recently written records to disk may also violate any transactional guarantees that StateStores are providing. Therefore, the default implementation of flush has been updated to a no-op. Users are now advised (via JavaDoc) that should instead request an early commit via. ProcessingContext#commit().

It's highly unlikely that users are depending on the flush method forcibly persisting state to disk, as it does today, because Kafka Streams guarantees that before the Processor is initialized, its task state will correspond with its changelog. Calling flush will also not impact restore performance in the event of an error, because under EOS, all task state will be wiped regardless, and under ALOS task state will be rebuilt from the last committed changelog offset irrespective of what state is already on disk.Therefore, it's likely that most/all user invocations of flush today has no effect, other than to potentially cause performance problems (e.g. by forcing RocksDB to flush many small memtables prematurely)of an error, because under EOS, all task state will be wiped regardless, and under ALOS task state will be rebuilt from the last committed changelog offset irrespective of what state is already on disk.

Therefore, it's likely that most/all user invocations of flush today has no effect, other than to potentially cause performance problems (e.g. by forcing RocksDB to flush many small memtables prematurely).

managesOffsets deprecation

The newly introduced StateStore#managesOffsets method will be immediately deprecated, to allow for its removal in a future release. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN level log message when initializing any StateStore that is persistent, but does not manage its own offsets.

Upgrade

When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action.

The default implementation of all three methods has been designed to ensure backwards-compatibility for any custom store that does not yet support these methods.

Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint and/or .position files.

Downgrade

When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. The older Kafka Streams version will be unable to open any RocksDB stores that were upgraded to store offsets (see Upgrading), which will cause Kafka Streams to wipe the state for those Tasks and restore the state, using an older RocksDB store format, from the changelogs.

...

.

Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint and/or .position files.

Downgrade

RocksDBStore will not be compatible with older versions of Kafka Streams, due to the extra offsets column family that older versions of Kafka Streams do not recognize. If a user downgrades from a version containing this KIP, to a version without it, by default the on-disk state for any Task containing a RocksDBStore will be wiped and restored from their changelogs.

However, we will support using the upgrade.from config to safely downgrade without losing local RocksDB state. When upgrade.from  is set to a version less than the version that introduced this KIP:

  • On RocksDBStore#close:
    • All changelogs stored in the RocksDB offsets column family will be written to the Task .checkpoint file.
    • All source topic offsets stored in the RocksDB offsets column family will be written to the store .position file.
    • The RocksDB offsets column family will be deleted from the RocksDB database, ensuring that older versions of Kafka Streams will be able to open the database successfully.

Test Plan

The existing tests that depend on .checkpoint files will either be removed (if they are no longer relevant), or modified to instead call StateStore#committedOffset().

...