Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: Thread
JIRA:
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Determines if this StateStore manages its own offsets. * <p> * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using * {@link #committedOffset(TopicPartition)},. even if the store is {@link #close() closed} and later re-opened. * <p> * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link * #committedOffset(TopicPartition)} will be expected to always return {@code null}. * <p> * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highlyrequired, * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams expects when operating * expects when operating* under thean {@code exactly-once} {@code processing.modeguarantee}. * <p> * @deprecated New implementations shouldare alwaysrequired return {to implement this method and return {@code true}. andExisting manageimplementations their own offsets. In the* future, should upgrade to managing their *own offsets as soon as possible, as the legacy offset management is deprecated this method will be removed* and it will be assumedremoved toin alwaysa return {@code true}future version. * @return Whether this StateStore manages its* own@deprecated offsets. New implementations should always return */ @Deprecated default boolean managesOffsets() {{@code true} and manage their own offsets. In the future, * return false; } /** this method will be *removed Flushand anyit cachedwill data be assumed to always return *{@code true}. * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()} @return Whether this StateStore manages its own offsets. */ @Deprecated default instead. */ @Deprecated default void flushboolean managesOffsets() { // no-opreturn false; } /** * CommitFlush allany written records to this StateStore.cached data * <p> * This@deprecated methodUse <b>MUST NOT<b> be called by users from {@link org.{@link org.apache.kafka.streams.processor.api.ProcessorProcessingContext#commit() ProcessorContext#commit()} * processors}, as doing so may violate the consistency guarantees provided by this store, andinstead. expected by Kafka*/ * Streams.@Deprecated default * <p> void flush() { // no-op * Instead, users should} call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() /** * ProcessorContext#commit()} to request a Task commit Commit all written records to this StateStore. * <p> * IfThis {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be method <b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor * persistedprocessors}. toDoing diskso alongwill withthrow thean written records{@link java.lang.UnsupportedOperationException}. * <p> * Instead, users should call {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() * ProcessorContext#commit()} to request a Task commit. * they<p> may contain multiple* partitionsIf in the case of a Global StateStore with multiple partitions. All provided * partitions <em>MUST</em> be{@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be * persisted to disk. along with the written * <p>records. * <p> Implementations <em>SHOULD</em> ensure that* {@code changelogOffsets} will areusually committedcontain toa disksingle atomicallypartition, within the case of a regular * records they represent, if possible.StateStore. However, * they may contain multiple partitions *in @paramthe changelogOffsetscase Theof changelog offset(s) corresponding to the most recently written records.a Global StateStore with multiple partitions. All provided * partitions <em>MUST</ em> be persisted to defaultdisk. void commit(final Map<TopicPartition, Long> changelogOffsets)* {<p> * Implementations flush(); } /** * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}<em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the * records they represent, if possible. * <p> * If@param {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return thechangelogOffsets The changelog offset(s) corresponding to the most recently written records. */ offset that correspondsdefault tovoid the changelog record most recently written to this store, for the given {@code * partition}.commit(final Map<TopicPartition, Long> changelogOffsets) { flush(); } /** * @param partition The partition to getReturns the committed offset for. * @return The lastmost recently {@link #commit(Map) committed} offset for the given {@code@link partitionTopicPartition};. or {@code null} if no* offset<p> * If {@link #managesOffsets()} and {@link #persistent()} both return has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()} * return {@code false{@code true}, this method will return the * offset that corresponds to the changelog record most recently written to this store, for the given {@code * partition}. */ default* Long@param committedOffset(finalpartition TopicPartitionThe partition) { return null; } |
Metrics
New
Each added metric will be on store-level and have the following tags:
type
=stream-state-metrics
thread-id
= [thread ID]task-id
= [task ID]- [store type]
-state-id
= [store ID] for key-value stores - [store type]
-session-state-id
= [store ID] for session stores - [store type]
-window-state-id
= [store ID] for window stores
...
Deprecated
stream-state-metrics
flush-rate
flush-latency-avg
flush-latency-max
These changes are necessary to ensure these metrics are not confused with orthogonal operations, like RocksDB memtable flushes or cache flushes. They will be measuring the invocation of StateStore#commit
, which replaces StateStore#flush
.
While the flush
metrics are only deprecated, they will no longer record any data under normal use, as Kafka Streams will no longer call StateStore#flush()
.
Proposed Changes
3 new methods will be added to StateStore, each defined with a default
implementation that ensures existing custom StateStores continue to function as they do today, to guarantee compatibility. All internal StateStore implementations will be updated with implementations of each of these methods that best suits that implementation.
For persistent stores (i.e. StateStore#persistent()
returns true
) that do not manage their own offsets (i.e. StateStore#managedOffsets()
returns false
), the existing checkpointing behaviour will be provided internally, by continuing to store checkpoint offsets in the .checkpoint
file. This will ensure compatibility for custom stores that have not been upgraded to manage their own offsets yet.
StateStore Implementations
RocksDBStore
RocksDBStore
will manage its own offsets (i.e. managesOffsets
will return true
), by storing changelog offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush()
, enabling RocksDB to dictate when memtables are flushed to disk, instead of coupling it to Task commit.
Interactive Query .position Offsets
Input partition "Position
" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position
file directly by the RocksDBStore
implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position
file. When a StateStore
that manages its own offsets is first initialized, if a .position
file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.
When writing data to a RocksDBStore
(via put
, delete
, etc.), the input partition offsets will be read from the changelog record metadata (as before), and written to the offsets column family.
InMemoryKeyValueStore
Since InMemoryKeyValueStore
is not persistent
, there is no state to synchronise the changelog offsets with on-commit. Therefore, InMemoryKeyValueStore
will not manage offsets (i.e. managesOffsets
will return false
), and therefore, no changes to its implementation is required.
Custom Persistent Stores
Any existing, persistent StateStore that has not been updated to manage its own offsets, will inherit the default
implementation of all three new methods. This will indicate that Kafka Streams should continue to checkpoint its offsets using the legacy .checkpoint
file. Maintainers of custom, persistent stores may add offset management after upgrading.
Consumer Rebalance Metadata
When determining the partition assignment, StreamsPartitionsAssignor
considers the checkpoint offsets on-disk for all StateStores on each instance, even for Tasks they have not (yet) been assigned. This is done via TaskManager#getTaskOffsetSums()
, which directly reads from the per-Task .checkpoint
file.
...
to get the committed offset for.
* @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
* has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
* return {@code false}.
*/
default Long committedOffset(final TopicPartition partition) {
return null;
}
|
Metrics
New
Each added metric will be on store-level and have the following tags:
type
=stream-state-metrics
thread-id
= [thread ID]task-id
= [task ID]- [store type]
-state-id
= [store ID] for key-value stores - [store type]
-session-state-id
= [store ID] for session stores - [store type]
-window-state-id
= [store ID] for window stores
Name | RecordingLevel | Metric Type | Description |
---|---|---|---|
commit-rate | DEBUG | Rate | The average number of calls to commit per second |
commit-latency-avg | DEBUG | Avg | The average latency of calls to commit |
commit-latency-max | DEBUG | Max | The maximum latency of calls to commit |
Deprecated
stream-state-metrics
flush-rate
flush-latency-avg
flush-latency-max
These changes are necessary to ensure these metrics are not confused with orthogonal operations, like RocksDB memtable flushes or cache flushes. They will be measuring the invocation of StateStore#commit
, which replaces StateStore#flush
.
While the flush
metrics are only deprecated, they will no longer record any data under normal use, as Kafka Streams will no longer call StateStore#flush()
.
Proposed Changes
3 new methods will be added to StateStore, each defined with a default
implementation that ensures existing custom StateStores continue to function as they do today, to guarantee compatibility. All internal StateStore implementations will be updated with implementations of each of these methods that best suits that implementation.
For persistent stores (i.e. StateStore#persistent()
returns true
) that do not manage their own offsets (i.e. StateStore#managedOffsets()
returns false
), the existing checkpointing behaviour will be provided internally, by continuing to store checkpoint offsets in the .checkpoint
file. This will ensure compatibility for custom stores that have not been upgraded to manage their own offsets yet.
Calling StateStore#commit
from inside a Processor
would be dangerous, because it would cause the StateStore
to believe that processed records have been committed when they have not (yet). Therefore, we will prevent this method from being called in this context by implementing it in AbstractReadWriteDecorator
, which wraps all stores provided to Processor
s, and throwing an UnsupportedOperationException
.
StateStore Implementations
RocksDBStore
RocksDBStore
will manage its own offsets (i.e. managesOffsets
will return true
), by storing changelog offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush()
, enabling RocksDB to dictate when memtables are flushed to disk, instead of coupling it to Task commit.
Interactive Query .position Offsets
Input partition "Position
" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position
file directly by the RocksDBStore
implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position
file. When a StateStore
that manages its own offsets is first initialized, if a .position
file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.
When writing data to a RocksDBStore
(via put
, delete
, etc.), the input partition offsets will be read from the changelog record metadata (as before), and written to the offsets column family.
InMemoryKeyValueStore
Since InMemoryKeyValueStore
is not persistent
, there is no state to synchronise the changelog offsets with on-commit. Therefore, InMemoryKeyValueStore
will not manage offsets (i.e. managesOffsets
will return false
), and therefore, no changes to its implementation is required.
Custom Persistent Stores
Any existing, persistent StateStore that has not been updated to manage its own offsets, will inherit the default
implementation of all three new methods. This will indicate that Kafka Streams should continue to checkpoint its offsets using the legacy .checkpoint
file. Maintainers of custom, persistent stores may add offset management after upgrading.
Consumer Rebalance Metadata
When determining the partition assignment, StreamsPartitionsAssignor
considers the checkpoint offsets on-disk for all StateStores on each instance, even for Tasks they have not (yet) been assigned. This is done via TaskManager#getTaskOffsetSums()
, which, currently, directly reads from the per-Task .checkpoint
file.
Since stores will now be managing their offsets internally, we will need to read offsets from the stores themselves instead of the .checkpoint
file.
- On start-up, we will construct and initialize state stores for every corresponding Task store directory we find on-disk, and call
committedOffset
to determine their stored offset(s). We will then cache these offsets in-memory. This cache will be shared among allStreamThread
s, and will therefore be thread-safe. - On rebalance, we will consult this cache to compute our Task offset lags.
- After state restore completes for an assigned Task, we will update the offset cache to
Task.LATEST_OFFSET
, to indicate that the Task now has the currently latest offset. - When closing a StateStore, we will update the offset cache with the current changelog offset for the store.
- This will ensure that when a
Task
is reassigned to another instance, the Task lag for the local state matches what's on-disk, instead of using the sentinel valueTask.LATEST_OFFSET
, which is only valid for Tasks currently assigned to a thread on the local instance.
- This will ensure that when a
Compatibility, Deprecation, and Migration Plan
...
The newly introduced StateStore#managesOffsets
method will be immediately deprecated, to allow for its removal in the next major release of Kafka Streamsa future release. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores
continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN
level log message when initializing any StateStore
that is persistent, but does not manage its own offsets.
Upgrade
When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action.
The default
implementation of all three methods has been designed to ensure backwards-compatibility for any custom store that does not yet support these methods.
Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint
and/or .position
files.
Downgrade
When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. The older Kafka Streams version will be unable to open any RocksDB stores that were upgraded to store offsets (see Upgrading), which will cause Kafka Streams to wipe the state for those Tasks and restore the state, using an older RocksDB store format, from the changelogs.
...
own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN
level log message when initializing any StateStore
that is persistent, but does not manage its own offsets.
Upgrade
When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action.
The default
implementation of all three methods has been designed to ensure backwards-compatibility for any custom store that does not yet support these methods.
Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint
and/or .position
files.
Downgrade
RocksDBStore will not be compatible with older versions of Kafka Streams, due to the extra offsets column family that older versions of Kafka Streams do not recognize. If a user downgrades from a version containing this KIP, to a version without it, by default the on-disk state for any Task containing a RocksDBStore will be wiped and restored from their changelogs.
However, we will support using the upgrade.from
config to safely downgrade without losing local RocksDB state. When upgrade.from
is set to a version less than the version that introduced this KIP:
- On
RocksDBStore#close:
- All changelogs stored in the RocksDB offsets column family will be written to the Task
.checkpoint
file. - All source topic offsets stored in the RocksDB offsets column family will be written to the store
.position
file. - The RocksDB offsets column family will be deleted from the RocksDB database, ensuring that older versions of Kafka Streams will be able to open the database successfully.
- All changelogs stored in the RocksDB offsets column family will be written to the Task
Test Plan
The existing tests that depend on .checkpoint
files will either be removed (if they are no longer relevant), or modified to instead call StateStore#committedOffset()
.
...
An alternative approach was suggested that we force-flush, not on every commit, but only on some commits, like we do under ALOS. This would be configured by a new configuration, for example statestore.flush.interval.ms
and/or statestore.flush.interval.records
. However, any default that we chose for these configurations would be arbitrary, and could result, under EOS, in more flushing than we do today. For some users, the defaults would be fine, and would likely have no impact. But for others, the defaults could be ill-suited to their workload, and could cause a performance regression on-upgrade.
We instead chose to take the opportunity to solve this with a more comprehensive set of changes to the StateStore
API , that should have additional benefits.
StreamsPartitionsAssignor to query stores for offsets
Instead of having TaskManager#getOffsetSums()
read from the .checkpoint
file directly, we originally intended to have it call StateStore#committedOffset()
on each store, and make the StateStore
responsible for tracking the offset for stores even when not initialized.
However, this is not possible, because a StateStore
does not know its TaskId
, and hence cannot determine its on-disk StateDirectory
until after StateStore#init()
has been called. We could have added a StateStoreContext
argument to committedOffset()
, but we decided against it, because doing so would make the API less clear, and correctly implementing this method would be more difficult for StateStore
implementation maintainers.
configurations would be arbitrary, and could result, under EOS, in more flushing than we do today. For some users, the defaults would be fine, and would likely have no impact. But for others, the defaults could be ill-suited to their workload, and could cause a performance regression on-upgrade.
We instead chose to take the opportunity to solve this with a more comprehensive set of changes to the StateStore
API , that should have additional benefitsInstead, we revert to the existing behaviour, and have the Streams engine continue to maintain the offsets for closed stores in the .checkpoint
file.