Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Deprecate the newly introduced managesOffsets

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    
    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * 
     * @return Whether this StateStore manages its@deprecated New implementations should always return {@code true} and manage their own offsets.
 In    */the future,
    default boolean* managesOffsets() {
        return false;
  this method }

will be removed and /**
it will be assumed to *always Flushreturn any cached data{@code true}.
     * 
@return Whether this StateStore manages *its @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}own offsets.
     */
    @Deprecated
    default boolean managesOffsets() {
  instead.
     */
    @Deprecated
    default void flush() {
     return false;
   // no-op
    }

    /**
     * CommitFlush allany written records to this StateStore.cached data
     * <p>
     * This@deprecated methodUse <b>MUST NOT<b> be called by users from {@link org.{@link org.apache.kafka.streams.processor.api.ProcessorProcessingContext#commit() ProcessorContext#commit()}
     * processors}, as doing so may violate the consistency guarantees provided by this store, and expected by Kafkainstead.
     */
     * Streams.@Deprecated
    default * <p>
void flush() {
        // no-op
 * Instead, users should}

 call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit()  /**
     * ProcessorContext#commit()} to request a Task commit Commit all written records to this StateStore.
     * <p>
     * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be 
     * persisted to disk along with the written records.This method <b>MUST NOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
     * processors}, as doing so may violate the consistency guarantees provided by this store, and expected by Kafka
     * <p>Streams.
     * {@code<p>
 changelogOffsets} will usually contain a* single partitionInstead, inusers theshould casecall of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided{@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * partitions <em>MUST</em> be persisted to diskProcessorContext#commit()} to request a Task commit.
     * <p>
     * Implementations <em>SHOULD</em> ensure thatIf {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will arebe committedguaranteed to be disk
 atomically with the
  * persisted to *disk recordsalong theywith represent,the ifwritten possiblerecords.
     * <p>
     * @param{@code changelogOffsets} Thewill changelog offset(s) corresponding to the most recently written records.usually contain a single partition, in the case of a regular StateStore. However,
     */
 they may contain defaultmultiple voidpartitions commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();in the case of a Global StateStore with multiple partitions. All provided
    }

    /** * partitions <em>MUST</em> be persisted to disk.
     * Returns<p>
 the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p> * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * Ifrecords {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that correspondsthey represent, if possible.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the changelog record most recently written torecords.
 this store, for the given {@code */
    default * partition}.
     * 
void commit(final Map<TopicPartition, Long> changelogOffsets) {
       * @paramflush();
 partition The partition to get the committed offset for. }

    /**
     * @returnReturns the Themost lastrecently {@link #commit(Map) committed} offset for the given {@code@link partitionTopicPartition};.
 or {@code null} if no* offset<p>
     *         has been committed for the partition, or if either If {@link #persistent#managesOffsets()} orand {@link #managesOffsets#persistent()}
 both return {@code true}, *this method will return the
     return {@code false}.
     */
    default Long committedOffset(final TopicPartition partition) {
   * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     return* null;
    }

Metrics

New

     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long committedOffset(final TopicPartition partition) {
        return null;
    }

Metrics

New

Each Each added metric will be on store-level and have the following tags:

...

Kafka Streams will automatically migrate offsets found in an existing .checkpoint file, and/or an existing .position file, to store those offsets directly in the StateStore, if managesOffsets returns true. Users of the in-built store types will not need to make any changes. See Upgrading.

...

flush deprecation

All internal usage of the StateStore#flush method has been removed/replaced. Therefore, the main concern is with end-users calling StateStore#flush directly from custom Processors. Obviously, users cannot safely call StateStore#commit, because they do not know the changelog offsets that correspond to the locally written state. Forcibly flushing/fsyncing recently written records to disk may also violate any transactional guarantees that StateStores are providing. Therefore, the default implementation of flush has been updated to a no-op. Users are now advised (via JavaDoc) that should instead request an early commit via. ProcessingContext#commit().

It's highly unlikely that users are depending on the flush method forcibly persisting state to disk, as it does today, because Kafka Streams guarantees that before the Processor is initialized, its task state will correspond with its changelog. Calling flush will also not impact restore performance in the event of an error, because under EOS, all task state will be wiped regardless, and under ALOS task state will be rebuilt from the last committed changelog offset irrespective of what state is already on disk.Therefore, it's likely that most/all user invocations of flush today has no effect, other than to potentially cause performance problems (e.g. by forcing RocksDB to flush many small memtables prematurely), because under EOS, all task state will be wiped regardless, and under ALOS task state will be rebuilt from the last committed changelog offset irrespective of what state is already on disk.

Therefore, it's likely that most/all user invocations of flush today has no effect, other than to potentially cause performance problems (e.g. by forcing RocksDB to flush many small memtables prematurely).

managesOffsets deprecation

The newly introduced StateStore#managesOffsets method will be immediately deprecated, to allow for its removal in the next major release of Kafka Streams. It is being introduced as a transitional mechanism, to ensure that existing custom StateStores continue to work as expected without managing their own offsets. However, to encourage maintainers of custom stores to add support for offset management, we will emit a WARN level log message when initializing any StateStore that is persistent, but does not manage its own offsets.

Upgrade

When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action.

...