Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: Thread
JIRA:
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Determines if this StateStore manages its own offsets. * <p> * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using * {@link #committedOffset(TopicPartition)}. * <p> * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link * #committedOffset(TopicPartition)} will be expected to always return {@code null}. * <p> * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highlyrequired, * recommended, if possible, to to ensure that custom StateStores provide the consistency guarantees that Kafka Streams expects when operating * expects when operating* under thean {@code exactly-once} {@code processing.modeguarantee}. * <p> * @deprecated New implementations should always are required to implement this method and return {@code true} and manage their own offsets. In the future,Existing implementations * should upgrade to managing their own offsets as soon as possible, this method will be removed and itas the legacy offset management is deprecated * and will be assumedremoved toin alwaysa return {@code true}future version. * @return Whether this StateStore manages its* own@deprecated offsets. New implementations should always return */ @Deprecated default boolean managesOffsets() {{@code true} and manage their own offsets. In the future, * return false; } /** this method will be *removed Flushand anyit cachedwill data be assumed to always return *{@code true}. * @return @deprecatedWhether Usethis {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()} * StateStore manages its own offsets. instead. */ @Deprecated default voidboolean flushmanagesOffsets() { // no-opreturn false; } /** * CommitFlush allany written records to this StateStore.cached data * <p> * This method <b>CANNOT<b> be called by users from @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessorProcessingContext#commit() ProcessorContext#commit()} * processors}. Doing so will throw an {@link java.lang.UnsupportedOperationException}. * <p> instead. */ @Deprecated default * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() void flush() { * ProcessorContext#commit()} to request a Task commit. // no-op } /** <p> * IfCommit {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be * persisted to disk along with the written records.all written records to this StateStore. * <p> * This method <b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor * <p> processors}. Doing so will throw *an {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,@link java.lang.UnsupportedOperationException}. * <p> * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided * partitions <em>MUST</em> be persisted to disk. * <p>ProcessorContext#commit()} to request a Task commit. * <p> * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed persisted to disk atomicallyalong with the *written records they represent, if possible. * <p> * @param{@code changelogOffsets} will usually Thecontain changelog offset(s) corresponding toa single partition, in the mostcase recentlyof a writtenregular recordsStateStore. However, */ they may contain defaultmultiple voidpartitions commit(final Map<TopicPartition, Long> changelogOffsets) { flush();in the case of a Global StateStore with multiple partitions. All provided } * partitions <em>MUST</** em> be persisted to disk. * Returns<p> the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}. * <p> * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the * Ifrecords {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the * offset that corresponds to the changelog recordthey represent, if possible. * * @param changelogOffsets The changelog offset(s) corresponding to the most recently written to this store, for the given {@coderecords. * partition}./ default * void commit(final Map<TopicPartition, Long> changelogOffsets) { * @param partitionflush(); The partition to get} the committed offset for./** * Returns @returnthe Themost lastrecently {@link #commit(Map) committed} offset for the given {@code@link partitionTopicPartition};. or {@code null} if no* offset<p> * If {@link #managesOffsets()} has been committed for the partition, or if either and {@link #persistent()} both orreturn {@link@code #managesOffsets()true} , this method will return *the * offset that corresponds returnto {@code false}. */ default Long committedOffset(final TopicPartition partition) { return null; } |
Metrics
the changelog record most recently written to this store, for the given {@code
* partition}.
*
* @param partition The partition to get the committed offset for.
* @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
* has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
* return {@code false}.
*/
default Long committedOffset(final TopicPartition partition) {
return null;
}
|
Metrics
NewNew
Each added metric will be on store-level and have the following tags:
...
Name | RecordingLevel | Metric Type | Description |
---|---|---|---|
commit-rate | DEBUG | Rate | The average number of calls to StateStore#commit(Map) .commit per second |
commit-latency-avg | DEBUG | Avg | The average time taken to call StateStore#commit(Map) .latency of calls to commit |
commit-latency-max | DEBUG | Max | The maximum time taken to call StateStore#commit(Map) .latency of calls to commit |
Deprecated
stream-state-metrics
flush-rate
flush-latency-avg
flush-latency-max
...
- On start-up, we will construct and initialize state stores for every corresponding Task store directory we find on-disk, and call
committedOffset
to determine their stored offset(s). We will then cache these offsets in-memory andclose()
these stores. This cache will be shared among allStreamThread
s, and will therefore be thread-safe. - On rebalance, we will consult this cache to compute our Task offset lags.
- After state restore completes for an assigned Task, we will update the offset cache to
Task.LATEST_OFFSET
, to indicate that the Task now has the currently latest offset. - When closing a StateStore, we will update the offset cache with the current changelog offset for the store.
- This will ensure that when a
Task
is reassigned to another instance, the Task lag for the local state matches what's on-disk, instead of using the sentinel valueTask.LATEST_OFFSET
, which is only valid for Tasks currently assigned to a thread on the local instance.
- This will ensure that when a
...
- instance.
Compatibility, Deprecation, and Migration Plan
...
All internal usage of the StateStore#flush
method has been removed/replaced. Therefore, the main concern is with end-users calling StateStore#flush
directly from custom Processor
s. Obviously, users cannot safely call StateStore#commit
, because they do not know the changelog offsets that correspond to the locally written state. Forcibly flushing/fsyncing recently written records to disk may also violate any transactional guarantees that StateStores are providing. Therefore, the default implementation of flush
has been updated to a no-op. Users are now advised (via JavaDoc) that should instead request an early commit via. ProcessingContext#commit()
.
It's highly unlikely that users are depending on the flush
method forcibly persisting state to disk, as it does today, because Kafka Streams guarantees that before the Processor
is initialized, its task state will correspond with its changelog. Calling flush
will also not impact restore performance in the event of an error, because under EOS, all task state will be wiped regardless, and under ALOS task state will be rebuilt from the last committed changelog offset irrespective of what state is already on disk.
Therefore, it's likely that most/all user invocations of flush
today has no effect, other than to potentially cause performance problems (e.g. by forcing RocksDB to flush many small memtables prematurely).
managesOffsets
deprecation
The newly introduced StateStore#managesOffsets
method will be immediately deprecated, to allow for its removal in the next major release of Kafka Streams. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores
continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN
level log message when initializing any StateStore
that is persistent, but does not manage its own offsets.
Upgrade
When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action.
The default
implementation of all three methods has been designed to ensure backwards-compatibility for any custom store that does not yet support these methods.
Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint
and/or .position
files.
Downgrade
When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. The older Kafka Streams version will be unable to open any RocksDB stores that were upgraded to store offsets (see Upgrading), which will cause Kafka Streams to wipe the state for those Tasks and restore the state, using an older RocksDB store format, from the changelogs.
...
been removed/replaced. Therefore, the main concern is with end-users calling StateStore#flush
directly from custom Processor
s. Obviously, users cannot safely call StateStore#commit
, because they do not know the changelog offsets that correspond to the locally written state. Forcibly flushing/fsyncing recently written records to disk may also violate any transactional guarantees that StateStores are providing. Therefore, the default implementation of flush
has been updated to a no-op. Users are now advised (via JavaDoc) that should instead request an early commit via. ProcessingContext#commit()
.
It's highly unlikely that users are depending on the flush
method forcibly persisting state to disk, as it does today, because Kafka Streams guarantees that before the Processor
is initialized, its task state will correspond with its changelog. Calling flush
will also not impact restore performance in the event of an error, because under EOS, all task state will be wiped regardless, and under ALOS task state will be rebuilt from the last committed changelog offset irrespective of what state is already on disk.
Therefore, it's likely that most/all user invocations of flush
today has no effect, other than to potentially cause performance problems (e.g. by forcing RocksDB to flush many small memtables prematurely).
managesOffsets
deprecation
The newly introduced StateStore#managesOffsets
method will be immediately deprecated, to allow for its removal in a future release. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores
continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN
level log message when initializing any StateStore
that is persistent, but does not manage its own offsets.
Upgrade
When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action.
The default
implementation of all three methods has been designed to ensure backwards-compatibility for any custom store that does not yet support these methods.
Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint
and/or .position
files.
Downgrade
RocksDBStore will not be compatible with older versions of Kafka Streams, due to the extra offsets column family that older versions of Kafka Streams do not recognize. If a user downgrades from a version containing this KIP, to a version without it, by default the on-disk state for any Task containing a RocksDBStore will be wiped and restored from their changelogs.
However, we will support using the upgrade.from
config to safely downgrade without losing local RocksDB state. When upgrade.from
is set to a version less than the version that introduced this KIP:
- On
RocksDBStore#close:
- All changelogs stored in the RocksDB offsets column family will be written to the Task
.checkpoint
file. - All source topic offsets stored in the RocksDB offsets column family will be written to the store
.position
file. - The RocksDB offsets column family will be deleted from the RocksDB database, ensuring that older versions of Kafka Streams will be able to open the database successfully.
- All changelogs stored in the RocksDB offsets column family will be written to the Task
Test Plan
The existing tests that depend on .checkpoint
files will either be removed (if they are no longer relevant), or modified to instead call StateStore#committedOffset()
.
...