...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.kstream; public class Consumed { public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor timestampExtractor, final org.apache.kafka.streams.AutoOffsetReset resetPolicy) { } public static <K, V> Consumed<K, V> with(final org.apache.kafka.streams.AutoOffsetReset resetPolicy) { } public Consumed<K, V> withOffsetResetPolicy(final org.apache.kafka.streams.AutoOffsetReset resetPolicy) { } } |
Proposed Changes
KafkaConsumer
Update KafkaConsumer clients (Classic/Async) to support the above config options. Similar to current implementation, we will use ListOffsetRequest to fetch the offset for the timestamp of the configured duration to reset the offset fetch positions.
For by_duration:<duration>, we will subtract the configured duration to the current timestamp on the client, and find the offsets using that subtracted timestamp.
We will deprecate the OffsetResetStrategy Enum class.
Even though this a public class, this is not used in any public APIs (except MockConsumer). In place of this, we will add internal class to handle Offset Reset Strategies.
KafkaShareConsumer
As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset
. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset
configuration. Currently share.auto.offset.reset
supports earliest and latest options to automatically reset the offset
Similar to the Kafka Consumer, we will add support for by_duration:<duration> config value for share.auto.offset.reset.
Kafka Streams
Currently Kafka Streams support EARLIEST, LATEST reset offset options. We will be deprecating existing Topology.AutoOffsetReset enum and add a new class org.apache.kafka.streams.AutoOffsetReset to capture the reset strategies. As part of the implementation, we will need to fetch the offset positions of the configured duration and seek to the fetched offsets.
We will implement new methods proposed above to the Topology and Consumed classes.
Compatibility, Deprecation, and Migration Plan
org.apache.kafka.streams.scala.kstream.Consumed
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.scala.kstream
object Consumed {
def `with`[K, V](
timestampExtractor: TimestampExtractor,
resetPolicy: AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
def `with`[K, V](
resetPolicy: AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
} |
Proposed Changes
KafkaConsumer
Update KafkaConsumer clients (Classic/Async) to support the above config options. Similar to current implementation, we will use ListOffsetRequest to fetch the offset for the timestamp of the configured duration to reset the offset fetch positions.
For by_duration:<duration>, we will subtract the configured duration to the current timestamp on the client, and find the offsets using that subtracted timestamp.
We will deprecate the OffsetResetStrategy Enum class.
Even though this a public class, this is not used in any public APIs (except MockConsumer). In place of this, we will add internal class to handle Offset Reset Strategies.
KafkaShareConsumer
As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset
. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset
configuration. Currently share.auto.offset.reset
supports earliest and latest options to automatically reset the offset
Similar to the Kafka Consumer, we will add support for by_duration:<duration> config value for share.auto.offset.reset.
Kafka Streams
Currently Kafka Streams support EARLIEST, LATEST reset offset options. We will be deprecating existing Topology.AutoOffsetReset enum and add a new class org.apache.kafka.streams.AutoOffsetReset to capture the reset strategies. As part of the implementation, we will need to fetch the offset positions of the configured duration and seek to the fetched offsets.
We will implement new methods proposed above to the Topology and Consumed classes.
Compatibility, Deprecation, and Migration Plan
- There is no change to the existing semantics and default value of the auto.offset.reset config values. We will be only adding the new config value.
We will deprecate OffsetResetStrategy Enum.
Code Block language java title OffsetResetStrategy.java package org.apache.kafka.clients.consumer; import java.util.Locale; public enum OffsetResetStrategy { LATEST, EARLIEST, NONE; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } }
Deprecate `public MockConsumer(OffsetResetStrategy offsetResetStrategy)` constructor
We will deprecate Topology.AutoOffsetReset Enum
- We will deprecate existing Topology.AutoOffsetReset based methods from Topology and Consumed classes
Code Block language java title org.apache.kafka.streams.Topology package org.apache.kafka.streams; public class Topology() { public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name,
- There is no change to the existing semantics and default value of the auto.offset.reset config values. We will be only adding the new config value.
We will deprecate OffsetResetStrategy Enum.
Code Block package org.apache.kafka.clients.consumer; import java.util.Locale; public enum OffsetResetStrategy { LATEST, EARLIEST, NONE; @Override public String toString() {language java title OffsetResetStrategy.java We will deprecate Topology.AutoOffsetReset Enum
- We will deprecate existing Topology.AutoOffsetReset based methods from Topology and Consumed classes
Code Block language java title org.apache.kafka.streams.Topology package org.apache.kafka.streams; final String... topics) { } public class Topology() { public public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final String... topicsPattern topicPattern) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final StringTimestampExtractor nametimestampExtractor, final Pattern topicPattern) { String name, } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String... topics) { } public synchronized Topology addSource(final TimestampExtractorAutoOffsetReset timestampExtractoroffsetReset, final StringTimestampExtractor nametimestampExtractor, final String... topics) { } public synchronized Topology addSource(final AutoOffsetResetString offsetResetname, final Pattern TimestampExtractortopicPattern) timestampExtractor,{ } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final String name, final Pattern topicPattern) { } public synchronized Topology addSource(final AutoOffsetResetDeserializer<?> offsetResetkeyDeserializer, final StringDeserializer<?> namevalueDeserializer, final Deserializer<?> keyDeserializer,String... topics) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final Deserializer<?> valueDeserializer, final String name, final String... topics) { } public synchronized Topology addSource(final AutoOffsetResetDeserializer<?> offsetResetkeyDeserializer, final StringDeserializer<?> namevalueDeserializer, final Deserializer<?> keyDeserializer,Pattern topicPattern) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final Deserializer<?> valueDeserializer, final String name, final Pattern topicPattern) { } public synchronized Topology addSource(final AutoOffsetResetTimestampExtractor offsetResettimestampExtractor, final StringDeserializer<?> namekeyDeserializer, final TimestampExtractorDeserializer<?> timestampExtractorvalueDeserializer, final Deserializer<?> keyDeserializer, String... topics) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final Deserializer<?> valueDeserializer, final String name, final String... topics) { } public synchronized Topology addSource(final AutoOffsetResetTimestampExtractor offsetResettimestampExtractor, final StringDeserializer<?> namekeyDeserializer, final TimestampExtractorDeserializer<?> timestampExtractorvalueDeserializer, final Deserializer<?> keyDeserializer, Pattern topicPattern) { } }
Code Block language java title org.apache.kafka.streams.kstream.Consumed package org.apache.kafka.streams.kstream; public class Consumed { public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Deserializer<?> valueDeserializer, final Serde<V> valueSerde, final Pattern topicPattern) { } }
Code Block language java title org.apache.kafka.streams.kstream.Consumed package org.apache.kafka.streams.kstream; public class Consumed { public static <K, V> Consumed<K, V> with(final Serde<K>TimestampExtractor keySerdetimestampExtractor, final Serde<V> valueSerde, Topology.AutoOffsetReset resetPolicy) { } public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) { } public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) { } final TimestampExtractor timestampExtractor}
org.apache.kafka.streams.scala.kstream.Consumed
Code Block language java title org.apache.kafka.streams.kstream.Consumed package org.apache.kafka.streams.scala.kstream object Consumed { @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0") def `with`[K, V]( timestampExtractor: TimestampExtractor, resetPolicy: Topology.AutoOffsetReset )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy) @deprecated("Use `with` method that accepts `AutoOffsetReset` final Topology.AutoOffsetReset resetPolicy) {instead", "4.0.0") def } public static <K, V> Consumed<K, V> with(final`with`[K, V]( resetPolicy: Topology.AutoOffsetReset resetPolicy )(implicit { } public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) { } keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde) }
Rejected Alternatives
- Add a separate config option to configure the duration like auto.offset.reset.by.duration
- Add earliest_local reset strategyto automatically resets the offset to the earliest message stored in the local log on the broker. Initially we felt earliest_local can be useful in certain use cases. I think we can ignore it for now and add back when we have a clear use case.
Use config values of the format minus-n-hours , minus-n-days, minus-n-months and minus-n-years. As per the suggestions, It will be good to follow the standard like ISO-8601 format for defining duration.