Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAccepted

Discussion thread: Thread

JIRA:  

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-1441217411

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)}.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highlyrequired,
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka StreamsStateStores provide the consistency guarantees that Kafka Streams expects when operating
     * under an {@code exactly-once} {@code processing.guarantee}.
     * <p>
     * New implementations are required to implement this method and return {@code true}. Existing implementations
     * should upgrade to managing their own offsets as soon as possible, as the legacy offset management is deprecated
     * expectsand whenwill operatingbe underremoved thein {@code exactly-once} {@code processing.mode}a future version.
     * 
     * @deprecated New implementations should always return {@code true} and manage their own offsets. In the future,
     *             this method will be removed and it will be assumed to always return {@code true}.
     * @return Whether this StateStore manages its own offsets.
     */
    @Deprecated
    default boolean managesOffsets() {
        return false;
    }

    /**
     * Flush any cached data
     * 
     * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
     */
    @Deprecated
    default void flush() {
        // no-op
    }

    /**
     * Commit all written records to this StateStore.
     * <p>
     * This method <b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
     * processors}. Doing so will throw an {@link java.lang.UnsupportedOperationException}.
     * <p>
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * <p>
     * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be 
     * persisted to disk along with the written records.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * records they represent, if possible.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
     */
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

    /**
     * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * 
     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long committedOffset(final TopicPartition partition) {
        return null;
    }

...

NameRecordingLevelMetric TypeDescription
commit-rateDEBUGRateThe average number of calls to StateStore#commit(Map).commit per second
commit-latency-avgDEBUGAvgThe average time taken to call StateStore#commit(Map).latency of calls to commit
commit-latency-maxDEBUGMaxThe maximum time taken to call StateStore#commit(Map).latency of calls to commit


Deprecated

  • stream-state-metrics 
    • flush-rate
    • flush-latency-avg 
    • flush-latency-max 

...

  • On start-up, we will construct and initialize state stores for every corresponding Task store directory we find on-disk, and call committedOffset to determine their stored offset(s). We will then cache these offsets in-memory and close() these stores. This cache will be shared among all StreamThreads, and will therefore be thread-safe.
  • On rebalance, we will consult this cache to compute our Task offset lags.
  • After state restore completes for an assigned Task, we will update the offset cache to Task.LATEST_OFFSET, to indicate that the Task now has the currently latest offset.
  • When closing a StateStore, we will update the offset cache with the current changelog offset for the store.
    • This will ensure that when a Task is reassigned to another instance, the Task lag for the local state matches what's on-disk, instead of using the sentinel value Task.LATEST_OFFSET, which is only valid for Tasks currently assigned to a thread on the local instance.

We will conduct performance testing for this strategy, and if we find that initializing and closing many on-disk stores on startup is prohibitively expensive, we will parallelize this process by forking a new thread for each Task directory.

Compatibility, Deprecation, and Migration Plan

...

The newly introduced StateStore#managesOffsets method will be immediately deprecated, to allow for its removal in the next major release of Kafka Streamsa future release. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN level log message when initializing any StateStore that is persistent, but does not manage its own offsets.

...