Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/ztj43f2w3brzdqof8nollfy1zkx9by4w 

JIRA: KAFKA-20035 - Getting issue details... STATUS

Motivation

The Semantic Gap

Current auto.offset.reset policies fail to support a common and critical use case: "start consuming from this moment forward, but don't miss anything after I start."

PolicySemanticBehaviorProblem
earliestProcess all historical dataAlways start from the beginning of the partition

Forces consumers to reprocess potentially millions of old messages, which is impractical for many use cases

latestSkip to the current end of the partitionAlways start from the current end offset at the
time of partition discovery
Causes silent data loss during partition expansion or log truncation
noneFail if no offset existsThrow exception when no committed offset is foundRequires manual intervention; doesn't provide automatic recovery
by_duration:<duration>Start from a relative time in the past

Calculate now() - duration at partition discovery time, then seek to that timestamp

Dynamic calculation causes data loss during partition expansion

Data Missing Scenario:

Scenario 1: Partition Expansion Data Loss

When a topic undergoes partition expansion, consumers configured with auto.offset.reset=latest face a critical data loss window:

  1. Existing topic usage: The topic is already actively used by producers and consumers
  2. Partition creation: New partitions are added to the existing topic
  3. Producer awareness: Producers quickly discover the new partitions via metadata refresh and begin producing records to them
  4. Consumer lag: Consumers may take longer to detect the partition expansion
  5. Offset initialization: When consumers eventually discover the new partitions, auto.offset.reset=latest is applied, causing them to skip all records produced to those partitions during the discovery window

Result: Messages produced between partition creation and consumer discovery are silently lost.

Scenario 2: Log Truncation Data Loss

When log segments are deleted due to retention policies or manual truncation, consumers with auto.offset.reset=latest may skip to the end instead of starting from the earliest available data after truncation, causing additional data loss.

The Missing Semantic

Users need a policy that:

  • Skips historical data that existed before the consumer started (like latest)
  • Captures all data produced after the consumer started (unlike latest)
  • Handles partition expansion gracefully (starts from earliest available when timestamp predates partition creation)
  • Handles log truncation gracefully (after segments are deleted, the consumer can still consume any remaining data whose timestamps fall at or after the startup time, rather than skipping to the end of the partition)

Public Interfaces

We propose updating the auto.offset.reset configuration to support a new value: by_start_time. This option automatically resets offsets based on the consumer’s startup timestamp.

When set to by_start_time, the consumer captures its startup timestamp once during initialization. For partitions without committed offsets, the consumer seeks to the first message with a timestamp at or after the startup time. For partitions where all messages postdate the startup time (e.g., newly created partitions), the consumer naturally starts from the beginning, preventing data loss.

Proposed Changes

KafkaConsumer

Update KafkaConsumer clients (classic/Async consumer) to support the new by_start_time configuration value. Similar to the current by_duration implementation, we will use the existing ListOffsetRequest to fetch the offset for a given timestamp to reset the offset fetch positions.

  1. Calculate the target timestamp once at consumer initialization: startupTimestamp = now()
  2. Store this timestamp in SubscriptionState as an immutable field
  3. All partitions without committed offsets use this same startupTimestamp, regardless of when they are discovered
  4. This prevents data loss because the timestamp is anchored to consumer startup, not partition discovery

Key difference between by_duraion and by_start_time

The fundamental difference lies in when and how the reference timestamp is calculated:


by_duration:<duration>by_start_time
Timestamp Calculationnow() - durationconsumer_startup_time
When CalculatedDynamically at partition discoveryOnce at consumer initialization
ConsistencyDifferent timestamp per partitionSame timestamp for all partitions
Reference PointPartition discovery timeConsumer startup time

Related Issue: KAFKA-19236

It is worth noting that the dynamic timestamp recalculation behavior of by_duration (i.e., re-computing now() - duration on every retry at partition discovery time) is itself a known issue, tracked separately in KAFKA-19236. That issue proposes fixing by_duration to compute the seek timestamp only once, rather than drifting on each retry.

This KIP does not attempt to fix by_duration. Instead, by_start_time is introduced as a complementary policy with an explicitly stable reference point — the consumer's startup time — making data-loss prevention a first-class guarantee rather than a side effect of a bug fix.

Compatibility, Deprecation, and Migration Plan

We discuss this improvement separately for different consumer types:

Classic/Async consumer

This is new feature, so it won't have backward compatibility issue.

Share consumer 

This KIP does not include changes for share consumer. Any future requirements will be addressed in a separate KIP.

Test Plan

All remaining tests should pass, and new unit and integration tests have been added to ensure the new behavior works as expected.

Rejected Alternatives

Track Initial Partition Count in Group Metadata

An earlier approach proposed tracking the initial partition count for each topic in the consumer group metadata and using it to distinguish between original and newly added partitions at the protocol level.

Key mechanism:

  • The coordinator records partition counts when a group first subscribes to a topic
  • This information is persisted in ConsumerGroupMetadataValue with a new InitialTopicPartitions field
  • The ConsumerGroupHeartbeatResponse is extended with InitialPartitionCount to communicate this to consumers
  • Consumers apply different offset reset strategies based on whether a partition is "new" or "old":
    • Partitions with id >= InitialPartitionCount: Force EARLIEST offset reset
    • Partitions with id < InitialPartitionCount: Use configured auto.offset.reset

Why rejected:

This approach was rejected due to fundamental design concerns:

  1. Protocol-level partition distinction is confusing: Introducing a protocol mechanism to distinguish "new" vs "old" partitions creates unnecessary complexity and cognitive overhead for users. Partitions should be treated uniformly from the consumer's perspective.
  2. Inconsistent offset reset behavior: Having the client apply different offset reset strategies based on partition age is counterintuitive and makes debugging difficult. Users expect auto.offset.reset to behave consistently across all partitions.



  • No labels