This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Accepted"
Discussion thread: here
Voting thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-405 introduced tiered storage in Apache Kafka. With tiered storage feature, Kafka can be used as a long-term storage service and have the option to have infinite retention. This allows users to not only consume data in real-time but also gives the flexibility to fetch older data based on retention policies.
Kafka consumer supports auto.offset.reset config option, which is used when there is no initial offset in Kafka (or) if the current offset does not exist any more on the server. This config currently supports earliest/latest/none options. Currently consumer resets might force applications to reprocess large amounts of data from earlier offsets. With infinite storage, its beneficial to have a duration based offset reset strategy. This will allow applications to consume/initialise from a fixed duration when there is no initial offset in Kafka.
Public Interfaces
Configuration
Kafka Consumer
We propose to update auto.offset.reset
config option to support new config value of the format by_duration:<duration>
by_duration:<duration>: This automatically resets the offsets back by configured duration.
Configuration | Description | Values |
auto.offset.reset | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server | Current valid values "latest" (default) , "earliest" and "none". support new config value of the format `by_duration:<duration>` This automatically resets the offsets back by configured duration. duration must be specified in ISO8601 format (PnDTnHnMn.nS). Negative duration is not allowed. This will subtract the duration to the current timestamp on the client, and finds the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects. example: |
Share Groups
Similar to the consumer groups, the following dynamic group configuration property changes are proposed.
Configuration | Description | Values |
---|---|---|
share.auto.offset.reset | How to initialize the share-partition start offset:
| Current Valid values "latest" (default) and "earliest". support new config value of the format `by_duration:<duration>` This automatically resets the offsets back by configured duration. duration must be specified in ISO8601 format (PnDTnHnMn.nS). Negative duration is not allowed. This option will subtract the duration to the current timestamp on the server, and find the offsets using that subtracted timestamp, and reset to those offsets. The duration specified won't consider daylight saving effects. example: |
Interfaces
MockConsumer
Add below public constructor to MockConsumer
/** * A mock consumer is instantiated by providing ConsumerConfig.AUTO_OFFSET_RESET_CONFIG value as the input. * @param offsetResetStrategy the offset reset strategy to use */ public MockConsumer(String offsetResetStrategy) { }
Kafka Steams changes
AutoOffsetReset
package org.apache.kafka.streams; /** * Sets the {@code auto.offset.reset} configuration when * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} * or {@link KTable} via {@link StreamsBuilder}. */ import java.time.Duration; import java.util.Optional; public class AutoOffsetReset { protected final Optional<Long> duration; protected AutoOffsetReset(Optional<Long> duration) { this.duration = duration; } public static AutoOffsetReset latest() { } public static AutoOffsetReset earliest() { } public static AutoOffsetReset none() { } /** duration can't be negative */ public static AutoOffsetReset byDuration(Duration duration) { } }
org.apache.kafka.streams.Topology
package org.apache.kafka.streams; public class Topology() { public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final String... topics) { } public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) { } public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics) { } public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) { } public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final String... topics) { } public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final Pattern topicPattern) { } public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final String... topics) { } public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final Pattern topicPattern) { } }
org.apache.kafka.streams.kstream.Consumed
package org.apache.kafka.streams.kstream; public class Consumed { public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor timestampExtractor, final org.apache.kafka.streams.AutoOffsetReset resetPolicy) { } public static <K, V> Consumed<K, V> with(final org.apache.kafka.streams.AutoOffsetReset resetPolicy) { } public Consumed<K, V> withOffsetResetPolicy(final org.apache.kafka.streams.AutoOffsetReset resetPolicy) { } }
org.apache.kafka.streams.scala.kstream.Consumed
package org.apache.kafka.streams.scala.kstream object Consumed { def `with`[K, V]( timestampExtractor: TimestampExtractor, resetPolicy: AutoOffsetReset )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy) def `with`[K, V]( resetPolicy: AutoOffsetReset )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde) }
Proposed Changes
KafkaConsumer
Update KafkaConsumer clients (Classic/Async) to support the above config options. Similar to current implementation, we will use ListOffsetRequest to fetch the offset for the timestamp of the configured duration to reset the offset fetch positions.
For by_duration:<duration>, we will subtract the configured duration to the current timestamp on the client, and find the offsets using that subtracted timestamp.
We will deprecate the OffsetResetStrategy Enum class.
Even though this a public class, this is not used in any public APIs (except MockConsumer). In place of this, we will add internal class to handle Offset Reset Strategies.
KafkaShareConsumer
As part of KIP-932, we are adding support for share consumer groups. Share consumer groups supports dynamic group configuration property share.auto.offset.reset
. This is used to set the initial Share-Partition Start Offset (SPSO) based on the share.auto.offset.reset
configuration. Currently share.auto.offset.reset
supports earliest and latest options to automatically reset the offset
Similar to the Kafka Consumer, we will add support for by_duration:<duration> config value for share.auto.offset.reset.
Kafka Streams
Currently Kafka Streams support EARLIEST, LATEST reset offset options. We will be deprecating existing Topology.AutoOffsetReset enum and add a new class org.apache.kafka.streams.AutoOffsetReset to capture the reset strategies. As part of the implementation, we will need to fetch the offset positions of the configured duration and seek to the fetched offsets.
We will implement new methods proposed above to the Topology and Consumed classes.
Compatibility, Deprecation, and Migration Plan
- There is no change to the existing semantics and default value of the auto.offset.reset config values. We will be only adding the new config value.
We will deprecate OffsetResetStrategy Enum.
OffsetResetStrategy.javapackage org.apache.kafka.clients.consumer; import java.util.Locale; public enum OffsetResetStrategy { LATEST, EARLIEST, NONE; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } }
Deprecate `public MockConsumer(OffsetResetStrategy offsetResetStrategy)` constructor
We will deprecate Topology.AutoOffsetReset Enum
- We will deprecate existing Topology.AutoOffsetReset based methods from Topology and Consumed classes
- org.apache.kafka.streams.Topology
package org.apache.kafka.streams; public class Topology() { public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final String... topics) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final Pattern topicPattern) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final String... topics) { } public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer<?> keyDeserializer, final Deserializer<?> valueDeserializer, final Pattern topicPattern) { } }
- org.apache.kafka.streams.kstream.Consumed
package org.apache.kafka.streams.kstream; public class Consumed { public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor timestampExtractor, final Topology.AutoOffsetReset resetPolicy) { } public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) { } public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) { } }
org.apache.kafka.streams.scala.kstream.Consumed
org.apache.kafka.streams.kstream.Consumedpackage org.apache.kafka.streams.scala.kstream object Consumed { @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0") def `with`[K, V]( timestampExtractor: TimestampExtractor, resetPolicy: Topology.AutoOffsetReset )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy) @deprecated("Use `with` method that accepts `AutoOffsetReset` instead", "4.0.0") def `with`[K, V]( resetPolicy: Topology.AutoOffsetReset )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde) }
Rejected Alternatives
- Add a separate config option to configure the duration like auto.offset.reset.by.duration
- Add earliest_local reset strategy to automatically resets the offset to the earliest message stored in the local log on the broker. Initially we felt earliest_local can be useful in certain use cases. I think we can ignore it for now and add back when we have a clear use case.
Use config values of the format minus-n-hours , minus-n-days, minus-n-months and minus-n-years. As per the suggestions, It will be good to follow the standard like ISO-8601 format for defining duration.