Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Change StateStore#commit to throw UnsupportedOperationException when called from a Processor

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    
    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * 
     * @deprecated New implementations should always return {@code true} and manage their own offsets. In the future,
     *             this method will be removed and it will be assumed to always return {@code true}.
     * @return Whether this StateStore manages its own offsets.
     */
    @Deprecated
    default boolean managesOffsets() {
        return false;
    }

    /**
     * Flush any cached data
     * 
     * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
     */
    @Deprecated
    default void flush() {
        // no-op
    }

    /**
     * Commit all written records to this StateStore.
     * <p>
     * This method <b>MUST NOT<b><b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
     * processors},. asDoing doing so maywill violatethrow thean consistency guarantees provided by this store, and expected by Kafka
     * Streams{@link java.lang.UnsupportedOperationException}.
     * <p>
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * <p>
     * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be 
     * persisted to disk along with the written records.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * records they represent, if possible.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
     */
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

    /**
     * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * 
     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long committedOffset(final TopicPartition partition) {
        return null;
    }

...

  • type = stream-state-metrics
  • thread-id = [thread ID]
  • task-id = [task ID]
  • [store type]-state-id = [store ID]    for key-value stores
  • [store type]-session-state-id = [store ID]    for session stores
  • [store type]-window-state-id = [store ID]    for window stores  
NameRecordingLevelMetric TypeDescription
commit-rateDEBUGRateThe number of calls to StateStore#commit(Map).
commit-latency-avgDEBUGAvgThe average time taken to call StateStore#commit(Map).
commit-latency-maxDEBUGMaxThe maximum time taken to call StateStore#commit(Map).


Deprecated

  • stream-state-metrics 
    • flush-rate
    • flush-latency-avg 
    • flush-latency-max 

...

For persistent stores (i.e. StateStore#persistent()  returns true) that do not manage their own offsets (i.e. StateStore#managedOffsets() returns false), the existing checkpointing behaviour will be provided internally, by continuing to store checkpoint offsets in the .checkpoint file. This will ensure compatibility for custom stores that have not been upgraded to manage their own offsets yet.

Calling StateStore#commit from inside a Processor would be dangerous, because it would cause the StateStore to believe that processed records have been committed when they have not (yet). Therefore, we will prevent this method from being called in this context by implementing it in AbstractReadWriteDecorator , which wraps all stores provided to Processors, and throwing an UnsupportedOperationException.

StateStore Implementations

...