Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleorg.apache.kafka.streams.kstream.Consumed
package org.apache.kafka.streams.kstream;
    

public class Consumed {
     public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
                                             final Serde<V> valueSerde,
                                             final TimestampExtractor timestampExtractor,
                                             final org.apache.kafka.streams.AutoOffsetReset resetPolicy) {
    }

    public static <K, V> Consumed<K, V> with(final org.apache.kafka.streams.AutoOffsetReset resetPolicy) {
    }


    public Consumed<K, V> withOffsetResetPolicy(final org.apache.kafka.streams.AutoOffsetReset resetPolicy) {
    }
 }

Proposed Changes

KafkaConsumer

Update KafkaConsumer clients (Classic/Async) to support the above config options. Similar to current implementation, we will use ListOffsetRequest to fetch the offset for the timestamp of the configured duration to reset the offset fetch positions.

For by_duration:<duration>, we will subtract the configured duration to the current timestamp on the client, and find the offsets using that subtracted timestamp. 

We will deprecate the OffsetResetStrategy Enum class. Even though this a public class, this is not used in any public APIs (except MockConsumer).  In place of this, we will add internal class to handle Offset Reset Strategies.

KafkaShareConsumer

As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset

Similar to the Kafka Consumer, we will add support for by_duration:<duration> config value for share.auto.offset.reset.

Kafka Streams

Currently Kafka Streams support EARLIEST, LATEST reset offset options. We will be deprecating existing Topology.AutoOffsetReset enum  and add a new class org.apache.kafka.streams.AutoOffsetReset to capture the reset strategies. As part of the implementation, we will need to fetch the offset positions of the configured duration and seek to the fetched offsets.

We will implement new methods proposed above to the Topology and Consumed classes.

Compatibility, Deprecation, and Migration Plan


org.apache.kafka.streams.scala.kstream.Consumed

Code Block
languagejava
titleorg.apache.kafka.streams.kstream.Consumed
package org.apache.kafka.streams.scala.kstream

object Consumed {

  def `with`[K, V](
    timestampExtractor: TimestampExtractor,
    resetPolicy: AutoOffsetReset
  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
    ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)


  def `with`[K, V](
    resetPolicy: AutoOffsetReset
  )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
    ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
}


Proposed Changes

KafkaConsumer

Update KafkaConsumer clients (Classic/Async) to support the above config options. Similar to current implementation, we will use ListOffsetRequest to fetch the offset for the timestamp of the configured duration to reset the offset fetch positions.

For by_duration:<duration>, we will subtract the configured duration to the current timestamp on the client, and find the offsets using that subtracted timestamp. 

We will deprecate the OffsetResetStrategy Enum class. Even though this a public class, this is not used in any public APIs (except MockConsumer).  In place of this, we will add internal class to handle Offset Reset Strategies.

KafkaShareConsumer

As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset configuration. Currently share.auto.offset.reset supports earliest and latest options to automatically reset the offset

Similar to the Kafka Consumer, we will add support for by_duration:<duration> config value for share.auto.offset.reset.

Kafka Streams

Currently Kafka Streams support EARLIEST, LATEST reset offset options. We will be deprecating existing Topology.AutoOffsetReset enum  and add a new class org.apache.kafka.streams.AutoOffsetReset to capture the reset strategies. As part of the implementation, we will need to fetch the offset positions of the configured duration and seek to the fetched offsets.

We will implement new methods proposed above to the Topology and Consumed classes.

Compatibility, Deprecation, and Migration Plan

  1. There is no change to the existing semantics and default value of the auto.offset.reset config values. We will be only adding the new config value.
  2. We will deprecate OffsetResetStrategy Enum.

    Code Block
    languagejava
    titleOffsetResetStrategy.java
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    
    public enum OffsetResetStrategy {
        LATEST, EARLIEST, NONE;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }
      


  3. Deprecate `public MockConsumer(OffsetResetStrategy offsetResetStrategy)` constructor
  4. We will deprecate Topology.AutoOffsetReset Enum

  5. We will deprecate existing Topology.AutoOffsetReset based methods from Topology and Consumed classes

    1. Code Block
      languagejava
      titleorg.apache.kafka.streams.Topology
      package org.apache.kafka.streams;
          
      
      public class Topology() {
          public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                                 final String name,
                               
  6. There is no change to the existing semantics and default value of the auto.offset.reset config values. We will be only adding the new config value.
  7. We will deprecate OffsetResetStrategy Enum.

    Code Block
    languagejava
    titleOffsetResetStrategy.java
    package org.apache.kafka.clients.consumer; import java.util.Locale; public enum OffsetResetStrategy { LATEST, EARLIEST, NONE; @Override public String toString() {
    1.         
    return
    1.  
    super.toString().toLowerCase(Locale.ROOT);
    1.     
    } }
    1.  
     Deprecate
    1.  
    `public
    1.  
    MockConsumer(OffsetResetStrategy offsetResetStrategy)` constructor
  8. We will deprecate Topology.AutoOffsetReset Enum

  9. We will deprecate existing Topology.AutoOffsetReset based methods from Topology and Consumed classes
    1. Code Block
      languagejava
      titleorg.apache.kafka.streams.Topology
      package org.apache.kafka.streams;  final String... topics) {
          }
      
      public class Topology() {
        public  public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                                 final String name,
                                                 final String... topicsPattern topicPattern) {
          }
      
          public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                                 final StringTimestampExtractor nametimestampExtractor,
                                                 final Pattern topicPattern) {
      String name,
           }
      
          public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                   final String... topics) {
          }
      
          public synchronized Topology addSource(final TimestampExtractorAutoOffsetReset timestampExtractoroffsetReset,
                                                 final StringTimestampExtractor nametimestampExtractor,
                                                 final String... topics) {
          }
      
          public synchronized Topology addSource(final AutoOffsetResetString offsetResetname,
                                                 final Pattern TimestampExtractortopicPattern) timestampExtractor,{
          }
      
          public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                    final String name,
                 final String name,
                                    final Pattern topicPattern) {
          }
      
          public synchronized Topology addSource(final AutoOffsetResetDeserializer<?> offsetResetkeyDeserializer,
                                                 final StringDeserializer<?> namevalueDeserializer,
                                                 final Deserializer<?> keyDeserializer,String... topics) {
          }
      
          public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                    final Deserializer<?> valueDeserializer,
                 final String name,
                                    final String... topics) {
          }
      
          public synchronized Topology addSource(final AutoOffsetResetDeserializer<?> offsetResetkeyDeserializer,
                                                 final StringDeserializer<?> namevalueDeserializer,
                                                 final Deserializer<?> keyDeserializer,Pattern topicPattern) {
          }
      
          public synchronized Topology addSource(final  AutoOffsetReset offsetReset,
                                   final Deserializer<?> valueDeserializer,
                  final String name,
                                   final Pattern topicPattern) {
          }
      
          public synchronized Topology addSource(final AutoOffsetResetTimestampExtractor offsetResettimestampExtractor,
                                                 final StringDeserializer<?> namekeyDeserializer,
                                                 final TimestampExtractorDeserializer<?> timestampExtractorvalueDeserializer,
                                                 final Deserializer<?> keyDeserializer, String... topics) {
          }
      
          public synchronized Topology addSource(final AutoOffsetReset offsetReset,
                                    final Deserializer<?> valueDeserializer,
                  final String name,
                                   final String... topics) {
          }
      
          public synchronized Topology addSource(final AutoOffsetResetTimestampExtractor offsetResettimestampExtractor,
                                                 final StringDeserializer<?> namekeyDeserializer,
                                                 final TimestampExtractorDeserializer<?> timestampExtractorvalueDeserializer,
                                                 final Deserializer<?> keyDeserializer, Pattern topicPattern) {
          }
       }



    2. Code Block
      languagejava
      titleorg.apache.kafka.streams.kstream.Consumed
      package org.apache.kafka.streams.kstream;
          
      
      public class Consumed {
           public static <K, V> Consumed<K, V> with(final      Serde<K> keySerde,
                         final Deserializer<?> valueDeserializer,
                               final Serde<V> valueSerde,
                      final Pattern topicPattern) {
          }
       }
      Code Block
      languagejava
      titleorg.apache.kafka.streams.kstream.Consumed
      package org.apache.kafka.streams.kstream;
               
      
      public class Consumed {
           public static <K, V> Consumed<K, V> with(final Serde<K>TimestampExtractor keySerdetimestampExtractor,
                                                    final Serde<V> valueSerde,
         Topology.AutoOffsetReset resetPolicy) {
          }
      
      
          public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
          }
      
      
          public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
          }
                                final TimestampExtractor timestampExtractor}


    3. org.apache.kafka.streams.scala.kstream.Consumed

      Code Block
      languagejava
      titleorg.apache.kafka.streams.kstream.Consumed
      package org.apache.kafka.streams.scala.kstream
      
      object Consumed {
      
      
        @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0")
        def `with`[K, V](
          timestampExtractor: TimestampExtractor,
          resetPolicy: Topology.AutoOffsetReset
        )(implicit                      keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
          ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
      
        @deprecated("Use `with` method that accepts `AutoOffsetReset`   final Topology.AutoOffsetReset resetPolicy) {instead", "4.0.0")
        def  }
      
          public static <K, V> Consumed<K, V> with(final`with`[K, V](
          resetPolicy: Topology.AutoOffsetReset
       resetPolicy )(implicit {
          }
      
      
          public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
          }
       keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
          ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
      
      }


Rejected Alternatives

  • Add a separate config option to configure the duration like auto.offset.reset.by.duration
  • Add earliest_local reset strategyto automatically resets the offset to the earliest message stored in the local log on the broker. Initially we felt earliest_local can be useful in certain use cases. I think we can ignore it for now and add back when we have a clear use case.
  • Use config values of the format minus-n-hours , minus-n-days, minus-n-months and minus-n-years. As per the suggestions, It will be good to follow the standard like ISO-8601 format for defining duration.