Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: Thread
JIRA:
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Determines if this StateStore manages its own offsets. * <p> * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using * {@link #committedOffset(TopicPartition)}. * <p> * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link * #committedOffset(TopicPartition)} will be expected to always return {@code null}. * <p> * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highlyrequired, * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streamsto ensure that custom StateStores provide the consistency guarantees that Kafka Streams expects when operating * under an {@code exactly-once} {@code processing.guarantee}. * <p> * New implementations are required to implement this method and return {@code true}. Existing implementations * should upgrade to managing their own offsets as soon as possible, as the legacy offset management is deprecated * expectsand whenwill operatingbe underremoved thein {@code exactly-once} {@code processing.mode}a future version. * * @deprecated New implementations should always return {@code true} and manage their own offsets. In the future, * this method will be removed and it will be assumed to always return {@code true}. * @return Whether this StateStore manages its own offsets. */ @Deprecated default boolean managesOffsets() { return false; } /** * Flush any cached data * * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()} * instead. */ @Deprecated default void flush() { // no-op } /** * Commit all written records to this StateStore. * <p> * This method <b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor * processors}. Doing so will throw an {@link java.lang.UnsupportedOperationException}. * <p> * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() * ProcessorContext#commit()} to request a Task commit. * <p> * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be * persisted to disk along with the written records. * <p> * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However, * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided * partitions <em>MUST</em> be persisted to disk. * <p> * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the * records they represent, if possible. * * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records. */ default void commit(final Map<TopicPartition, Long> changelogOffsets) { flush(); } /** * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}. * <p> * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the * offset that corresponds to the changelog record most recently written to this store, for the given {@code * partition}. * * @param partition The partition to get the committed offset for. * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset * has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()} * return {@code false}. */ default Long committedOffset(final TopicPartition partition) { return null; } |
...
Name | RecordingLevel | Metric Type | Description |
---|---|---|---|
commit-rate | DEBUG | Rate | The average number of calls to StateStore#commit(Map) .commit per second |
commit-latency-avg | DEBUG | Avg | The average time taken to call StateStore#commit(Map) .latency of calls to commit |
commit-latency-max | DEBUG | Max | The maximum time taken to call StateStore#commit(Map) .latency of calls to commit |
Deprecated
stream-state-metrics
flush-rate
flush-latency-avg
flush-latency-max
...
- On start-up, we will construct and initialize state stores for every corresponding Task store directory we find on-disk, and call
committedOffset
to determine their stored offset(s). We will then cache these offsets in-memory andclose()
these stores. This cache will be shared among allStreamThread
s, and will therefore be thread-safe. - On rebalance, we will consult this cache to compute our Task offset lags.
- After state restore completes for an assigned Task, we will update the offset cache to
Task.LATEST_OFFSET
, to indicate that the Task now has the currently latest offset. - When closing a StateStore, we will update the offset cache with the current changelog offset for the store.
- This will ensure that when a
Task
is reassigned to another instance, the Task lag for the local state matches what's on-disk, instead of using the sentinel valueTask.LATEST_OFFSET
, which is only valid for Tasks currently assigned to a thread on the local instance.
- This will ensure that when a
We will conduct performance testing for this strategy, and if we find that initializing and closing many on-disk stores on startup is prohibitively expensive, we will parallelize this process by forking a new thread for each Task directory.
Compatibility, Deprecation, and Migration Plan
...
The newly introduced StateStore#managesOffsets
method will be immediately deprecated, to allow for its removal in the next major release of Kafka Streamsa future release. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores
continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN
level log message when initializing any StateStore
that is persistent, but does not manage its own offsets.
...