DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/ztj43f2w3brzdqof8nollfy1zkx9by4w
JIRA: KAFKA-20035 - Getting issue details... STATUS
Motivation
The Semantic Gap
Current auto.offset.reset policies fail to support a common and critical use case: "start consuming from this moment forward, but don't miss anything after I start."
| Policy | Semantic | Behavior | Problem |
|---|---|---|---|
| earliest | Process all historical data | Always start from the beginning of the partition | Forces consumers to reprocess potentially millions of old messages, which is impractical for many use cases |
| latest | Skip to the current end of the partition | Always start from the current end offset at the time of partition discovery | Causes silent data loss during partition expansion or log truncation |
| none | Fail if no offset exists | Throw exception when no committed offset is found | Requires manual intervention; doesn't provide automatic recovery |
| by_duration:<duration> | Start from a relative time in the past | Calculate | Dynamic calculation causes data loss during partition expansion |
Data Missing Scenario:
Scenario 1: Partition Expansion Data Loss
When a topic undergoes partition expansion, consumers configured with auto.offset.reset=latest face a critical data loss window:
- Existing topic usage: The topic is already actively used by producers and consumers
- Partition creation: New partitions are added to the existing topic
- Producer awareness: Producers quickly discover the new partitions via metadata refresh and begin producing records to them
- Consumer lag: Consumers may take longer to detect the partition expansion
- Offset initialization: When consumers eventually discover the new partitions,
auto.offset.reset=latestis applied, causing them to skip all records produced to those partitions during the discovery window
Result: Messages produced between partition creation and consumer discovery are silently lost.
Scenario 2: Log Truncation Data Loss
When log segments are deleted due to retention policies or manual truncation, consumers with auto.offset.reset=latest may skip to the end instead of starting from the earliest available data after truncation, causing additional data loss.
The Missing Semantic
Users need a policy that:
- Skips historical data that existed before the consumer started (like
latest) - Captures all data produced after the consumer started (unlike
latest) - Handles partition expansion gracefully (starts from earliest available when timestamp predates partition creation)
- Handles log truncation gracefully (after segments are deleted, the consumer can still consume any remaining data whose timestamps fall at or after the startup time, rather than skipping to the end of the partition)
Public Interfaces
We propose updating the auto.offset.reset configuration to support a new value: by_start_time. This option automatically resets offsets based on the consumer’s startup timestamp.
When set to by_start_time, the consumer captures its startup timestamp once during initialization. For partitions without committed offsets, the consumer seeks to the first message with a timestamp at or after the startup time. For partitions where all messages postdate the startup time (e.g., newly created partitions), the consumer naturally starts from the beginning, preventing data loss.
Proposed Changes
KafkaConsumer
Update KafkaConsumer clients (classic/Async consumer) to support the new by_start_time configuration value. Similar to the current by_duration implementation, we will use the existing ListOffsetRequest to fetch the offset for a given timestamp to reset the offset fetch positions.
- Calculate the target timestamp once at consumer initialization:
startupTimestamp = now() - Store this timestamp in
SubscriptionStateas an immutable field - All partitions without committed offsets use this same
startupTimestamp, regardless of when they are discovered - This prevents data loss because the timestamp is anchored to consumer startup, not partition discovery
Key difference between by_duraion and by_start_time
The fundamental difference lies in when and how the reference timestamp is calculated:
by_duration:<duration> | by_start_time | |
|---|---|---|
| Timestamp Calculation | now() - duration | consumer_startup_time |
| When Calculated | Dynamically at partition discovery | Once at consumer initialization |
| Consistency | Different timestamp per partition | Same timestamp for all partitions |
| Reference Point | Partition discovery time | Consumer startup time |
Related Issue: KAFKA-19236
It is worth noting that the dynamic timestamp recalculation behavior of by_duration (i.e., re-computing now() - duration on every retry at partition discovery time) is itself a known issue, tracked separately in KAFKA-19236. That issue proposes fixing by_duration to compute the seek timestamp only once, rather than drifting on each retry.
This KIP does not attempt to fix by_duration. Instead, by_start_time is introduced as a complementary policy with an explicitly stable reference point — the consumer's startup time — making data-loss prevention a first-class guarantee rather than a side effect of a bug fix.
Compatibility, Deprecation, and Migration Plan
We discuss this improvement separately for different consumer types:
Classic/Async consumer
This is new feature, so it won't have backward compatibility issue.
Share consumer
This KIP does not include changes for share consumer. Any future requirements will be addressed in a separate KIP.
Test Plan
All remaining tests should pass, and new unit and integration tests have been added to ensure the new behavior works as expected.
Rejected Alternatives
Track Initial Partition Count in Group Metadata
An earlier approach proposed tracking the initial partition count for each topic in the consumer group metadata and using it to distinguish between original and newly added partitions at the protocol level.
Key mechanism:
- The coordinator records partition counts when a group first subscribes to a topic
- This information is persisted in
ConsumerGroupMetadataValuewith a newInitialTopicPartitionsfield - The
ConsumerGroupHeartbeatResponseis extended withInitialPartitionCountto communicate this to consumers - Consumers apply different offset reset strategies based on whether a partition is "new" or "old":
- Partitions with
id >= InitialPartitionCount: ForceEARLIESToffset reset - Partitions with
id < InitialPartitionCount: Use configuredauto.offset.reset
- Partitions with
Why rejected:
This approach was rejected due to fundamental design concerns:
- Protocol-level partition distinction is confusing: Introducing a protocol mechanism to distinguish "new" vs "old" partitions creates unnecessary complexity and cognitive overhead for users. Partitions should be treated uniformly from the consumer's perspective.
- Inconsistent offset reset behavior: Having the client apply different offset reset strategies based on partition age is counterintuitive and makes debugging difficult. Users expect
auto.offset.resetto behave consistently across all partitions.