DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/ztj43f2w3brzdqof8nollfy1zkx9by4w
Vote thread: https://lists.apache.org/thread/0z10kzlldwwqctx8p7l42r6o1h4nxo01
JIRA: KAFKA-20035 - Getting issue details... STATUS
Motivation
The Semantic Gap
Current auto.offset.reset policies fail to support a common and critical use case: "start consuming from this moment forward, but don't miss anything after I start."
| Policy | Semantic | Behavior | Problem |
|---|---|---|---|
| earliest | Process all historical data | Always start from the beginning of the partition | Forces consumers to reprocess potentially millions of old messages, which is impractical for many use cases |
| latest | Skip to the current end of the partition | Always start from the current end offset at the time of partition discovery | Causes silent data loss during partition expansion or log truncation |
| none | Fail if no offset exists | Throw exception when no committed offset is found | Requires manual intervention; doesn't provide automatic recovery |
| by_duration:<duration> | Start from a relative time in the past | Calculate | Dynamic calculation causes data loss during partition expansion |
Data Missing Scenario:
Scenario 1: Partition Expansion Data Loss
When a topic undergoes partition expansion, consumers configured with auto.offset.reset=latest face a critical data loss window:
- Existing topic usage: The topic is already actively used by producers and consumers
- Partition creation: New partitions are added to the existing topic
- Producer awareness: Producers quickly discover the new partitions via metadata refresh and begin producing records to them
- Consumer lag: Consumers may take longer to detect the partition expansion
- Offset initialization: When consumers eventually discover the new partitions,
auto.offset.reset=latestis applied, causing them to skip all records produced to those partitions during the discovery window
Result: Messages produced between partition creation and consumer discovery are silently lost.
Scenario 2: Log Truncation Data Loss
When log segments are deleted due to retention policies or manual truncation, consumers with auto.offset.reset=latest may skip to the end instead of starting from the earliest available data after truncation, causing additional data loss.
The Missing Semantic
Users need a policy that:
- Skips historical data that existed before the consumer started (like
latest) - Captures all data produced after the consumer started (unlike
latest) - Handles partition expansion gracefully (the group creation timestamp predates the new partition, so the lookup naturally resolves to the beginning of the partition)
- Handles log truncation gracefully (the consumer uses the same group creation timestamp to seek, consuming any remaining data with timestamps at or after the group's creation rather than skipping to the end of the partition)
Public Interfaces
Consumer configuration
We propose updating the auto.offset.reset configuration to support a new value: to_start_time. When set, the consumer determines the starting offset for all partitions based on the group creation timestamp provided by the broker. The consumer always issues a ListOffsetsRequest with this timestamp, ensuring a single, predictable rule across all scenarios.
The behaviour will be:
| Scenario | Condition | Behavior |
|---|---|---|
| No committed offset | groupCreationTime is known | Issue ListOffsetsRequest with groupCreationTime — seek to the first message at or after the group's creation. If the partition was created after the group, the lookup naturally resolves to the beginning of that partition. |
| No committed offset, timestamps unknown | groupCreationTime == -1 | Throw exception — cannot safely determine the correct offset without server-side timestamps |
| Committed offset is out of range | e.g., AdminClient.deleteRecords or log segment deletion | Issue ListOffsetsRequest with groupCreationTime — seek to the first message at or after the group's creation, consuming all surviving data from that point forward |
Admin client
AdminClient.describeConsumerGroups() now exposes the group creation timestamp. Callers can invoke groupCreationTimeMs() on the returned ConsumerGroupDescription to retrieve the epoch-milliseconds when the group was first created on the broker. This value is empty if the broker does not support KIP-1282 or the group predates the upgrade.
ConsumerGroupDescription gains a new method groupCreationTimeMs()
groupCreationTimeMs() (OptionalLong): The creation time of the consumer group on the broker.OptionalLong.empty()if unknown (e.g., older broker version, or group predates the upgrade).
Similarly, AdminClient.describeStreamsGroups() also exposes the group creation timestamp.
StreamsGroupDescription gains a new method groupCreationTimeMs().
groupCreationTimeMs() (OptionalLong): The creation time of the streams group on the broker.OptionalLong.empty()if unknown (e.g., older broker version, or group predates the upgrade).
Kafka Streams
The public class org.apache.kafka.streams.AutoOffsetReset gains a new factory method toStartTime() that corresponds to the new to_start_time reset policy. This allows Kafka Streams users to configure to_start_time on a per-source-topic basis via Consumed.withOffsetResetPolicy(), consistent with how earliest(), latest(), none(), and byDuration() work today.
kafka-consumer-groups.sh/kafka-streams-groups.sh
--describe --state --verbose
Adds a new GROUP-CREATION-TIME column showing the group creation timestamp formatted as ISO-8601 UTC (yyyy-MM-dd'T'HH:mm:ss.SSSXXX). If the value is unavailable (e.g., older broker or group predates the KIP-1282 upgrade), the column displays -.
Example output:
./bin/kafka-consumer-groups.sh --describe --state --verbose GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE GROUP-EPOCH TARGET-ASSIGNMENT-EPOCH #MEMBERS GROUP-CREATION-TIME my-group broker-1:9092 (1) range Stable 5 5 3 2025-01-15T10:30:00.000Z legacy-group broker-2:9092 (2) range Stable 2 2 2 -
./bin/kafka-streams-groups.sh --describe --state --verbose GROUP COORDINATOR (ID) STATE GROUP-EPOCH TARGET-ASSIGNMENT-EPOCH #MEMBERS GROUP-CREATION-TIME my-streams-app broker-1:9092 (1) Stable 3 3 2 2025-01-15T10:30:00.000Z legacy-app broker-2:9092 (2) Stable 1 1 1 -
--reset-offsets --to-start-time
gains a new --to-start-time option. It uses the group creation timestamp as the target timestamp for ListOffsets, resetting each partition to the first message at or after the group's creation time.
./bin/kafka-consumer-groups.sh --bootstrap-server <broker> \
--group <group> \
--reset-offsets --to-start-time \
--topic <topic> \
--execute
./bin/kafka-streams-groups.sh --bootstrap-server <broker> \
--group <group> \
--reset-offsets --to-start-time \
--topic <topic> \
--execute
If the group creation timestamp is unavailable, the tool exits immediately with an error and performs no offset reset. The error message distinguishes between two cases, consistent with GroupCreationTimeNotAvailableException:
- If the broker does not support KIP-1282: Error: Cannot reset offsets using --to-start-time. The broker does not support to_start_time (requires ConsumerGroupDescribe v2+ or StreamsGroupDescribe v1+). Please upgrade the broker.
- If the broker supports KIP-1282 but the group has no recorded creation time: Error: Cannot reset offsets using --to-start-time. The group creation time is not available for group '<group>'. The group may have been created before KIP-1282 was enabled. Please delete and recreate the group.
Note: --to-start-time is not added to kafka-streams-application-reset.sh. This tool has been superseded by kafka-streams-groups.sh for streams group management. Users should use kafka-streams-groups.sh --reset-offsets --to-start-time instead.
Protocol changes
ConsumerGroupHeartbeatRequest (version 2): no new fields will be added.
ConsumerGroupHeartbeatResponse (version 2) gains a new field:
- GroupCreationTimeMs (int64): The creation time of the consumer group on the broker. -1 if unknown.
ConsumerGroupDescribeRequest (version 2): no new fields will be added.
ConsumerGroupDescribeResponse (version 2) gains a new field in DescribedGroup:
- GroupCreationTimeMs (int64): The creation time of the consumer group on the broker. -1 if unknown (e.g., old broker, or group predates this feature). This allows operators to inspect the semantic anchor being used by
to_start_timeconsumers via kafka-consumer-groups.sh --describe or the Admin API, without needing to be the consumer itself.
StreamsGroupHeartbeatRequest (version 1): no new fields will be added.
StreamsGroupHeartbeatResponse (version 1) gains a new field:
- GroupCreationTimeMs (int64): The creation time of the streams group on the broker. -1 if unknown.
StreamsGroupDescribeRequest (version 1): no new fields will be added.
StreamsGroupDescribeResponse (version 1) gains a new field in DescribedGroup:
- GroupCreationTimeMs (int64): The creation time of the streams group on the broker. -1 if unknown (e.g., old broker, or group predates this feature).
Consumer Offsets Record Changes
ConsumerGroupMetadataValue gains a new tagged field
- CreationTimeMs (int64, default -1): recorded once when the consumer group is first created.
StreamsGroupMetadataValue gains a new tagged field
- CreationTimeMs (int64, default -1): recorded once when the streams group is first created.
Exception changes
A new exception GroupCreationTimeNotAvailableException is introduced to signal that the consumer cannot determine the correct starting offset because the group creation timestamp is unavailable from the broker.
Proposed Changes
Consumer configuration
to_start_time instructs the consumer to always use the group creation timestamp as the reference point when no committed offset exists or when the committed offset is out of range:
- Group creation time is recorded by the broker when the consumer group is first created, and returned to the consumer via
ConsumerGroupHeartbeatResponse.GroupCreationTimeMs. This timestamp serves as the sole anchor — representing the moment the consumer group "started".
When performing offset reset, the consumer always issues a ListOffsetsRequest with groupCreationTime as the target timestamp, seeking to the first message at or after the group's creation. This single rule applies uniformly to all scenarios:
- New partitions (created after the group): Since all messages in the partition postdate the group creation time, the
ListOffsetsRequestnaturally resolves to the beginning of the partition. No data is lost. - Existing partitions (created before the group): The lookup finds the first message at or after the group's creation time, skipping historical data.
- Out-of-range offsets (e.g., due to
AdminClient.deleteRecordsor log segment deletion): The sameListOffsetsRequestwithgroupCreationTimeis issued, ensuring the consumer seeks to the correct position and consumes as much surviving data as possible.
This approach is semantically accurate: the name to_start_time implies "start consuming from when my group started." Using the group creation timestamp as the sole reference point for all scenarios is exactly what this name promises — no hidden branching, no conditional comparisons. The behavior directly reflects the name.
Clarification
It is worth noting that to_start_time is not equivalent to latest, even when they appear to resolve to the same offset. When groupCreationTime is strictly greater than the timestamp of the latest record in the partition, ListOffsetsRequest will resolve to the Log End Offset — not because the policy falls back to latest, but because no records satisfy the timestamp condition. The consumer will begin consuming any new records produced after this point normally. The distinction matters: latest is a direct seek to the Log End Offset, whereas to_start_time always anchors to the group creation timestamp and lets the lookup result follow naturally.
Kafka Streams
auto.offset.reset only affects source topics and repartition topics in a Kafka Streams application. Changelog topics and global state topics have their offset management hardcoded to "none" and are handled separately via StoreChangelogReader, so to_start_time does not interfere with state restoration.
At the StreamsConfig level, users can set auto.offset.reset=to_start_time to apply the policy to all source topics in the application. For per-source-topic overrides, the new AutoOffsetReset.toStartTime() factory method can be used with Consumed.withOffsetResetPolicy().
/**
* Sets the {@code auto.offset.reset} configuration when
* {@link Topology#addSource(AutoOffsetReset, String, String...) adding a source processor}
* or when creating {@link KStream} or {@link KTable} via {@link StreamsBuilder}.
*/
public class AutoOffsetReset {
// skip
/**
* Creates an {@code AutoOffsetReset} instance representing "to_start_time".
* The source topic will be reset using the consumer group's creation
* timestamp as the reference point. For newly added partitions whose
* messages all postdate the group creation time, the lookup naturally
* resolves to the beginning of the partition, preventing data loss
* during partition expansion.
*
* @return An {@code AutoOffsetReset} instance for the "to_start_time" reset policy.
*/
public static AutoOffsetReset toStartTime() {
return new AutoOffsetReset(StrategyType.TO_START_TIME, Optional.empty());
}
}
kafka-consumer-groups.sh/kafka-streams-groups.sh
--describe --state --verbose: Displaying GROUP-CREATION-TIME
--describe --state --verbose fetches group metadata via AdminClient.describeConsumerGroups()/AdminClient.describeStreamsGroups() and calls groupCreationTimeMs() on the returned ConsumerGroupDescription/StreamsGroupDescription:
- If the returned OptionalLong is non-empty, the millisecond timestamp is formatted using SimpleDateFormat with pattern yyyy-MM-dd'T'HH:mm:ss.SSSXXX (consistent with Utils.getDateTime(), which is the existing datetime format used by --to-datetime) and displayed in the GROUP-CREATION-TIME column.
- If the returned value is OptionalLong.empty(), the column displays "-".
--reset-offsets --to-start-time
The execution flow is as follows:
- Fetch the group creation timestamp: Call AdminClient.describeConsumerGroups()/AdminClient.describeStreamsGroups() and read groupCreationTimeMs() from the returned ConsumerGroupDescription/StreamsGroupDescription.
- If the value is OptionalLong.empty(), exit immediately with the following error:
- If the negotiated version does not support GroupCreationTimeMs (ConsumerGroupDescribe < 2 or StreamsGroupDescribe < 1): the broker is too old to support this feature. Error: Cannot reset offsets using --to-start-time. The broker does not support to_start_time (requires ConsumerGroupDescribe v2+ or StreamsGroupDescribe v1+). Please upgrade the broker.
- If the negotiated version supports GroupCreationTimeMs but the returned value is -1: the group was created before KIP-1282 was enabled. Error: Cannot reset offsets using --to-start-time. The group creation time is not available for group '<group>'. The group may have been created before KIP-1282 was enabled. Please delete and recreate the group.
- If the value is OptionalLong.empty(), exit immediately with the following error:
- Issue ListOffsets: Use groupCreationTimeMs as the target timestamp and send a ListOffsetsRequest for all specified partitions, resolving the first offset at or after the group creation time for each partition.
- Commit offsets:
- With --execute: commit the resolved offsets.
- Without --execute (dry-run): print the resolved offsets without committing.
Protocol changes
ConsumerGroupHeartbeatResponse bumps from version 1 to version 2. Version 2 adds GroupCreationTimeMs (int64): the creation time when this consumer group was first created on the broker. -1 if unknown.
{
"apiKey": 68,
"type": "response",
"name": "ConsumerGroupHeartbeatResponse",
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
// skip
{ "name": "GroupCreationTimeMs", "versions": "2+", "type": "int64", "default": -1, "ignorable": true,
"about": "The creation time of the consumer group." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided; the assignment otherwise.", "fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member that can be used immediately." }
]}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
}
StreamsGroupHeartbeatResponse bumps from version 0 to version 1. Version 1 adds GroupCreationTimeMs (int64): the creation time when this streams group was first created on the broker. -1 if unknown.
{
"apiKey": 88,
"type": "response",
"name": "StreamsGroupHeartbeatResponse",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
// skip
{ "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
"about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." },
{ "name": "GroupCreationTimeMs", "versions": "1+", "type": "int64", "default": -1, "ignorable": true,
"about": "The creation time of the streams group." },
// skip
}
ConsumerGroupDescribeResponse bumps from version 1 to version 2. Version 2 adds GroupCreationTimeMs (int64) in DescribedGroup: the creation time when this consumer group was first created on the broker. -1 if unknown (e.g., old broker, or group predates this feature).
{
"apiKey": 69,
"type": "response",
"name": "ConsumerGroupDescribeResponse",
// Version 1 adds MemberType field (KIP-1099).
// Version 2 adds GroupCreationTimeMs field (KIP-1282).
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
"about": "Each described group.",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
// skip
{ "name": "GroupCreationTimeMs", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true,
"about": "The creation time of the consumer group. -1 if unknown." },
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]
}
],
// skip
}
StreamsGroupDescribeResponse bumps from version 0 to version 1. Version 1 adds GroupCreationTimeMs (int64) in DescribedGroup: the creation time when this streams group was first created on the broker. -1 if unknown (e.g., old broker, or group predates this feature).
{
"apiKey": 89,
"type": "response",
"name": "StreamsGroupDescribeResponse",
// Version 1 adds GroupCreationTimeMs field (KIP-1282).
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
"about": "Each described group.",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
// skip
{ "name": "GroupCreationTimeMs", "type": "int64", "versions": "1+", "default": "-1", "ignorable": true,
"about": "The creation time of the streams group. -1 if unknown." },
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]
}
],
// skip
}
Consumer Offsets Record Changes
ConsumerGroupMetadataValue gains a new tagged field CreationTimeMs (int64, default -1): recorded once when the consumer group is first created.
{
"apiKey": 3,
"type": "coordinator-value",
"name": "ConsumerGroupMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." },
// The MetadataHash is added in 4.1 (KIP-1101). It's used to track
// subscribed topics in the group. When subscribed topics change,
// like partition count or rack change, the hash will be different.
// It indicates that the group should be rebalanced.
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"default": 0, "taggedVersions": "0+", "tag": 0,
"about": "The hash of all topics in the group." },
{ "name": "CreationTimeMs", "versions": "0+", "type": "int64",
"default": -1, "taggedVersions": "0+", "tag": 1,
"about": "The creation time of the consumer group." }
]
}
StreamsGroupMetadataValue gains a new tagged field CreationTimeMs (int64, default -1): recorded once when the streams group is first created.
{
"apiKey": 17,
"type": "coordinator-value",
"name": "StreamsGroupMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." },
// skip
{ "name": "CreationTimeMs", "versions": "0+", "type": "int64",
"default": -1, "taggedVersions": "0+", "tag": 2,
"about": "The creation time of the streams group." }
],
// skip
}
Admin client
ConsumerGroupDescription.groupCreationTimeMs() returns the epoch-milliseconds when the consumer group was first created on the broker, sourced from the GroupCreationTimeMs field in ConsumerGroupDescribeResponse.
The return value follows these rules:
- Returns a non-empty OptionalLong containing the group creation timestamp when the broker supports KIP-1282 and the group was created after the upgrade
- Returns OptionalLong.empty() when GroupCreationTimeMs == -1, which may occur in the following situations:
- The broker does not support KIP-1282 yet (e.g., older broker version)
- The group predates the upgrade and the creation time was never recorded
public class ConsumerGroupDescription {
/**
* The creation time of the consumer group on the broker, in milliseconds since epoch.
* Returns an OptionalLong.empty() if the broker does not support KIP-1282,
* or if the group predates the upgrade and the creation time was never recorded.
*/
public OptionalLong groupCreationTimeMs() { ... }
}
Similarly, StreamsGroupDescription.groupCreationTimeMs() returns the epoch-milliseconds when the streams group was first created on the broker, sourced from the GroupCreationTimeMs field in StreamsGroupDescribeResponse. The return value follows the same rules as ConsumerGroupDescription.groupCreationTimeMs().
public class StreamsGroupDescription {
/**
* The creation time of the streams group on the broker, in milliseconds since epoch.
* Returns OptionalLong.empty() if the broker does not support
* KIP-1282, or if the group predates the upgrade and the
* creation time was never recorded.
*/
public OptionalLong groupCreationTimeMs() { ... }
}
Exception
GroupCreationTimeNotAvailableException is thrown when to_start_time is configured and the group creation timestamp is not available. Both cases use the same exception type, distinguished by the error message:
- If the negotiated protocol version does not support GroupCreationTimeMs (ConsumerGroupHeartbeat < 2 or StreamsGroupHeartbeat < 1): the broker is too old to support this feature. Example message: "The broker does not support
to_start_time(requires ConsumerGroupHeartbeat v2+ or StreamsGroupHeartbeat v1+)." - If the negotiated version supports GroupCreationTimeMs but groupCreationTimeMs == -1: the broker supports this feature, but the group was created before KIP-1282 was enabled and the creation time was never recorded. Example message: "The group creation time is not available for group 'my-group'. The group may have been created before KIP-1282 was enabled. Please delete and recreate the group."
This exception is intentionally distinct from NoOffsetForPartitionException, which signals the absence of a committed offset. GroupCreationTimeNotAvailableException signals a different root cause: the anchor timestamp required by to_start_time is not available, making it impossible to safely determine the correct offset without risking data loss.
public class GroupCreationTimeNotAvailableException extends KafkaException {
public GroupCreationTimeNotAvailableException(String message) {
super(message);
}
}
Upon throwing GroupCreationTimeNotAvailableException, the consumer does not attempt any automatic fallback. The exception is propagated to the caller's poll() invocation, leaving the recovery strategy to the user. This design is intentional — any automatic fallback without a known anchor timestamp risks silent data loss, which contradicts the safety guarantee that to_start_time aims to provide.
Known Limitations
Server Clock Accuracy
The group creation timestamp is recorded by the broker, while message timestamps may originate from producers on different machines. If there is clock drift between the broker and producers, the ListOffsetsRequest may return slightly imprecise results. However, this is an inherent limitation of any timestamp-based approach and is not unique to to_start_time. In practice, moderate clock skew (within seconds) is unlikely to cause meaningful data loss.
Producer-set Timestamps
If producers explicitly set custom timestamps on their records that do not reflect real time (e.g., event-time semantics with historical dates), the ListOffsetsRequest may return unexpected results. This is fundamentally a data quality issue that users are responsible for.
Key Difference Between by_duration and to_start_time
The fundamental difference lies in when and how the reference timestamp is determined:
| by_duration | to_start_time | |
|---|---|---|
| Timestamp Calculation | now() - duration | Group creation time (recorded by broker) |
| When Determined | Dynamically at partition discovery | Once when the group is first created |
| Consistency | Different timestamp per partition | Same timestamp for all partitions |
| Reference Point | Partition discovery time (client-side) | Group creation time (server-side) |
Related Issue: KAFKA-19236
It is worth noting that the dynamic timestamp recalculation behavior of by_duration (i.e., re-computing now() - duration on every retry at partition discovery time) is itself a known issue, tracked separately in KAFKA-19236. That issue proposes fixing by_duration to compute the seek timestamp only once, rather than drifting on each retry.
This KIP does not attempt to fix by_duration. Instead, to_start_time is introduced as a complementary policy with a stable, server-side reference point — the group creation time — making data-loss prevention a first-class guarantee rather than a side effect of a bug fix.
Compatibility, Deprecation, and Migration Plan
We discuss this improvement separately for different consumer types:
Modern consumer (KIP-848 consumer group protocol)/ Kafka Streams
This is new feature, so it won't have backward compatibility issue.
Classic consumer
This KIP does not add to_start_time support to classic consumer groups. If a user configures to_start_time with a classic consumer group, the consumer will throw ConfigException at startup, clearly indicating that the policy is not available for classic consumer groups.
Classic consumer groups that are upgraded to modern consumer groups also cannot use to_start_time immediately, because the group creation timestamp was not recorded at the time the group was originally created. The consumer will throw GroupCreationTimeNotAvailableException in this case. Users who wish to use to_start_time after upgrading must delete and recreate the group so that the broker can record a fresh creation timestamp. This is the intended behavior — silently falling back to another policy would risk data loss, which contradicts the safety guarantee that to_start_time aims to provide.
Share consumer
This KIP does not include changes for share consumer groups. Share groups do not require this feature because the "latest" policy already starts at offset 0 for newly added partitions in share groups, avoiding the partition-expansion data-loss scenario that this KIP addresses.
Future Work: Default Value Change
We believe to_start_time better matches what most users expect from a default offset reset policy. However, changing the default value is out of scope for this KIP. We intend to first ship to_start_time as an opt-in feature, gather community adoption experience and feedback, and then open a separate KIP to discuss changing the default value of auto.offset.reset in a future release.
Test Plan
All existing tests must continue to pass. New unit and integration tests will be added to cover:
to_start_timeresolves to the correct offset when no committed offset exists, when new partitions are added after group creation, and when committed offsets go out of range.- The consumer throws GroupCreationTimeNotAvailableException when the broker returns groupCreationTimeMs == -1, with no automatic fallback.
- Configuring
to_start_timewith a classic consumer group throws ConfigException at startup. - A classic group upgraded to a modern group throws GroupCreationTimeNotAvailableException when
to_start_timeis configured. - ConsumerGroupDescription.groupCreationTimeMs() returns the correct value for supported and unsupported brokers.
- CreationTimeMs is persisted once in ConsumerGroupMetadataValue and is not overwritten on subsequent epoch bumps.
- StreamsGroupDescription.groupCreationTimeMs() returns the correct value for supported and unsupported brokers.
AutoOffsetReset.toStartTime()correctly configuresto_start_timefor per-source-topic overrides in Kafka Streams applications- kafka-consumer-groups.sh and kafka-streams-groups.sh --describe --state --verbose and --reset-offsets --to-start-time behave correctly for both supported and unsupported brokers.
Rejected Alternatives
KIP-842: Separate Initial and Invalid Offset Reset Strategies
KIP-842 proposed addressing the partition-expansion data-loss problem by introducing two new consumer configurations — auto.offset.reset.on.initial.offset and auto.offset.reset.on.invalid.offset — along with new strategy values such as latest_on_start, earliest_on_start, and nearest
Key mechanism:
latest_on_startresets to the latest offset only at application startup, decoupled from the out-of-range handlingnearestselects earliest or latest based on proximity when an out-of-range error occurs- Users would combine settings (e.g.,
auto.offset.reset.on.initial.offset=latest_on_start+auto.offset.reset=earliest) to recover from partition expansion
Why rejected:
- No stable anchor point: KIP-842 provides no server-side reference timestamp. Without a fixed anchor (such as group creation time), it is impossible to guarantee consistent, reproducible seeking across all partitions, including those added after the group was created.
- Configuration complexity: Introducing two additional config keys with interdependent fallback semantics increases cognitive overhead for users and makes misconfiguration more likely.
to_start_timeachieves the same goal with a single, self-explanatory value under the existingauto.offset.resetconfig.
Track Initial Partition Count in Group Metadata
An earlier approach proposed tracking the initial partition count for each topic in the consumer group metadata and using it to distinguish between original and newly added partitions at the protocol level.
Key mechanism:
- The coordinator records partition counts when a group first subscribes to a topic
- This information is persisted in
ConsumerGroupMetadataValuewith a newInitialTopicPartitionsfield - The
ConsumerGroupHeartbeatResponseis extended withInitialPartitionCountto communicate this to consumers - Consumers apply different offset reset strategies based on whether a partition is "new" or "old":
- Partitions with
id >= InitialPartitionCount: ForceEARLIESToffset reset - Partitions with
id < InitialPartitionCount: Use configuredauto.offset.reset
- Partitions with
Why rejected:
This approach was rejected due to fundamental design concerns:
- Protocol-level partition distinction is confusing: Introducing a protocol mechanism to distinguish "new" vs "old" partitions creates unnecessary complexity and cognitive overhead for users. Partitions should be treated uniformly from the consumer's perspective.
- Inconsistent offset reset behavior: Having the client apply different offset reset strategies based on partition age is counterintuitive and makes debugging difficult. Users expect
auto.offset.resetto behave consistently across all partitions.
Client-side Startup Timestamp
An earlier version of this proposal captured startupTimestamp = now() on the client at consumer initialization and used it as the ListOffsets timestamp for all partitions without committed offsets.
Key mechanism:
- The consumer records startupTimestamp = now() once during initialization
- All partitions without committed offsets use this same timestamp, regardless of when they are discovered
- For partitions where all messages postdate the startup time (e.g., newly created partitions), the consumer starts from the beginning
Why rejected:
- Static member crash: When a static member restarts, the consumer re-initializes with a new startup timestamp. Any records produced to newly-assigned partitions between the original startup and the restart are silently skipped — defeating the purpose of
to_start_time. - Clock skew: The client clock may diverge from the broker clock, causing the timestamp-based ListOffsets lookup to return incorrect results.
Two-Timestamp Comparison (Group Creation Time vs Partition Creation Time)
An earlier revision of this KIP proposed comparing two server-side timestamps — the group creation time and the partition creation time — to determine whether to reset to EARLIEST or seek by timestamp.
Key mechanism:
- The broker records both group creation time and partition creation time
- Both timestamps are returned to the consumer via
ConsumerGroupHeartbeatResponse - The consumer compares the two:
- If
partitionCreationTime > groupCreationTime: Reset to EARLIEST - Otherwise: Issue
ListOffsetsRequestwithgroupCreationTime
- If
- For out-of-range offsets: Reset to EARLIEST
Why rejected:
- Unnecessary complexity: The comparison introduces conditional branching that makes the behavior harder for users to understand and predict. Different scenarios trigger different reset strategies (EARLIEST vs timestamp-based lookup vs EARLIEST again for out-of-range), creating cognitive overhead.
- Larger protocol change: Passing partition creation timestamps in the heartbeat response increases the payload size and requires changes to
PartitionRecordin the metadata log. The simplified approach only requires the group creation timestamp, reducing the scope of protocol and metadata changes. - Inconsistent out-of-range handling: The two-timestamp approach used EARLIEST for out-of-range offsets, which is inconsistent with the timestamp-based lookup used for other cases. The simplified approach applies the same
ListOffsetsRequestwithgroupCreationTimeuniformly across all scenarios. - Semantically redundant: Using
groupCreationTimewithListOffsetsRequestalready handles the new-partition case naturally — if all messages in a partition postdate the group creation time, the lookup resolves to the beginning of the partition. The explicit EARLIEST branch is unnecessary.