Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Revert to using .checkpoint file for rebalance changelog offsets

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    
    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * 
     * @return Whether this StateStore manages its own offsets.
     */
    default boolean managesOffsets() {
        return false;
    }

    /**
     * Flush any cached data
     * 
     * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
     */
    @Deprecated
    default void flush() {
        // no-op
    }

    /**
     * Commit all written records to this StateStore.
     * <p>
     * This method <b>MUST NOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
     * processors}, as doing so may violate the consistency guarantees provided by this store, and expected by Kafka
     * Streams.
     * <p>
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * <p>
     * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be 
     * persisted to disk along with the written records.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * records they represent, if possible.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
     */
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

    /**
     * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * <p>
     * This method <i>MUST</i> return the correct offset even if the store is not currently {@link #isOpen() open}.
     * 
     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long committedOffset(final TopicPartition partition) {
        return null;
    }

Metrics

New

Each added metric will be on store-level and have the following tags:

  • type = stream-state-metrics
  • thread-id = [thread ID]
  • task-id = [task ID]
  • [store type]-state-id = [store ID]    for key-value stores
  • [store type]-session-state-id = [store ID]    for session stores
  • [store type]-window-state-id = [store ID]    for window stores  
NameRecordingLevelMetric TypeDescription
commit-rateDEBUGRateThe number of calls to StateStore#commit(Map).
commit-latency-avgDEBUGAvgThe average time taken to call StateStore#commit(Map).
commit-latency-maxDEBUGMaxThe maximum time taken to call StateStore#commit(Map).

...

RocksDBStore will manage its own offsets (i.e. managesOffsets  will return true), by storing changelog offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk, instead of coupling it to Task commit.

To ensure that offsets can be returned by committedOffset, even with the store is closed (i.e. after close() or before init()), we will write out the latest value for all changelog offsets to a separate file on-disk in the close() method, which will be read from when isOpen() returns false. This file will be deleted during init, and the offsets will be read directly from the column family while the database is open.

Interactive Query .position Offsets

Interactive Query .position Offsets

Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position file directly by the RocksDBStore implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file. When a StateStore that manages its own offsets is first initialized, if a .position file exists in the store directory, its offsets will be automatically migrated Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position file directly by the RocksDBStore implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file. When a StateStore that manages its own offsets is first initialized, if a .position file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.

...

When determining the partition assignment, StreamsPartitionsAssignor considers the checkpoint offsets on-disk for all StateStores on each instance, even for Tasks they have not (yet) been assigned. This is done via TaskManager#getTaskOffsetSums(), which directly reads from the per-Task .checkpoint file.

With our new API, we need to continue to ensure that this file will not contain contains the offsets for of stores that manage changelog offsets themselves. So we need to modify the TaskManager to read not from the.checkpoint file, but from each StateStore by calling committedOffset. For this reason, committedOffset is required to provide the committed offset even if the store has not been initialized via initStateStore  implementations MUST ensure they can provide offsets when the store is closed/not yet initialized without incurring a significant penalty (e.g. by having to open then close the storeare closed, or have not yet been initialized. We will do this under EOS by updating the .checkpoint file whenever a store is close()d. Note: stores that do not manage their own offsets will continue to also update this file on-commit (provided at least 10,000 records have been written).

Compatibility, Deprecation, and Migration Plan

...

An alternative approach was suggested that we force-flush, not on every commit, but only on some commits, like we do under ALOS. This would be configured by a new configuration, for example statestore.flush.interval.ms  and/or statestore.flush.interval.records. However, any default that we chose for these configurations would be arbitrary, and could result, under EOS, in more flushing than we do today. For some users, the defaults would be fine, and would likely have no impact. But for others, the defaults could be ill-suited to their workload, and could cause a performance regression on-upgrade.would be arbitrary, and could result, under EOS, in more flushing than we do today. For some users, the defaults would be fine, and would likely have no impact. But for others, the defaults could be ill-suited to their workload, and could cause a performance regression on-upgrade.

We instead chose to take the opportunity to solve this with a more comprehensive set of changes to the StateStore API , that should have additional benefits.

StreamsPartitionsAssignor to query stores for offsets

Instead of having TaskManager#getOffsetSums()  read from the .checkpoint  file directly, we originally intended to have it call StateStore#committedOffset() on each store, and make the StateStore responsible for tracking the offset for stores even when not initialized.

However, this is not possible, because a StateStore  does not know its TaskId , and hence cannot determine its on-disk StateDirectory  until after StateStore#init() has been called. We could have added a StateStoreContext  argument to committedOffset() , but we decided against it, because doing so would make the API less clear, and correctly implementing this method would be more difficult for StateStore implementation maintainers.

Instead, we revert to the existing behaviour, and have the Streams engine continue to maintain the offsets for closed stores in the .checkpoint fileWe instead chose to take the opportunity to solve this with a more comprehensive set of changes to the StateStore API , that should have additional benefits.