DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
We propose updating the auto.offset.reset configuration to support a new value: byto_start_time. When set, the consumer determines the starting offset for all partitions based on the group creation timestamp provided by the broker. The consumer always issues a ListOffsetsRequest with this timestamp, ensuring a single, predictable rule across all scenarios.
...
The public class org.apache.kafka.streams.AutoOffsetReset gains a new factory method byStartTime toStartTime() that corresponds to the new by to_start_time reset policy. This allows Kafka Streams users to configure byconfigure to_start_time on a per-source-topic basis via Consumed.withOffsetResetPolicy(), consistent with how earliest(), latest(), none(), and byDuration() work work today.
kafka-consumer-groups.sh/kafka-streams-groups.sh
...
| Code Block |
|---|
./bin/kafka-streams-groups.sh --bootstrap-server <broker> \
--group <group> \
--reset-offsets --to-start-time \
--topic <topic> \
--execute |
If the group creation timestamp is unavailable, the tool exits immediately with an error and performs no offset reset. Note: The error message distinguishes between two cases, consistent with GroupCreationTimeNotAvailableException:
- If the broker does not support KIP-1282: Error: Cannot reset offsets using --to-start-time. The broker does not support to_start_time (requires ConsumerGroupDescribe v2+ or StreamsGroupDescribe v1+). Please upgrade the broker.
- If the broker supports KIP-1282 but the group has no recorded creation time: Error: Cannot reset offsets using --to-start-time. The group creation time is not available for group '<group>'. The group may have been created before KIP-1282 was enabled. Please delete and recreate the group.
Note: --to-start-time is not added to kafka-streams-application-reset.sh. This tool has been superseded by kafka- is not added to kafka-streams-application-reset.sh. This tool has been superseded by kafka-streams-groups.sh for streams group management. Users should use kafka-streams-groups.sh --reset-offsets --to-start-time instead.
...
- GroupCreationTimeMs (int64): The creation time of the consumer group on the broker. -1 if unknown (e.g., old broker, or group predates this feature). This allows operators to inspect the semantic anchor being used by byby
to_start_timeconsumers consumers via kafka-consumer-groups.sh --describe or the Admin API, without needing to be the consumer itself.
...
Proposed Changes
Consumer configuration
byto_start_time instructs instructs the consumer to always use the group creation timestamp as the reference point when no committed offset exists or when the committed offset is out of range:
...
This approach is semantically accurate: the name byto_start_time implies implies "start consuming from when my group started." Using the group creation timestamp as the sole reference point for all scenarios is exactly what this name promises — no hidden branching, no conditional comparisons. The behavior directly reflects the name.
Clarification
It is worth noting that byto_start_time is not equivalent to latest, even when they appear to resolve to the same offset. When groupCreationTime is strictly greater than the timestamp of the latest record in the partition, ListOffsetsRequest will resolve to the Log End Offset — not because the policy falls back to latest, but because no records satisfy the timestamp condition. The consumer will begin consuming any new records produced after this point normally. The distinction matters: latest is a direct seek to the Log End Offset, whereas byto_start_time always always anchors to the group creation timestamp and lets the lookup result follow naturally.
...
auto.offset.reset only affects source topics and repartition topics in a Kafka Streams application. Changelog topics and global state topics have their offset management hardcoded to "none" and are handled separately via StoreChangelogReader, so byto_start_time does does not interfere with state restoration.
At the StreamsConfig level, users can set auto.offset.reset=byto_start_time to apply the policy to all source topics in the application. For per-source-topic overrides, the new new AutoOffsetReset.byStartTimetoStartTime() factory method can be used with Consumed.withOffsetResetPolicy().
| Code Block | ||
|---|---|---|
| ||
/**
* Sets the {@code auto.offset.reset} configuration when
* {@link Topology#addSource(AutoOffsetReset, String, String...) adding a source processor}
* or when creating {@link KStream} or {@link KTable} via {@link StreamsBuilder}.
*/
public class AutoOffsetReset {
// skip
/**
* Creates an {@code AutoOffsetReset} instance representing "byto_start_time".
* The source topic will be reset using the consumer group's creation
* timestamp as the reference point. For newly added partitions whose
* messages all postdate the group creation time, the lookup naturally
* resolves to the beginning of the partition, preventing data loss
* during partition expansion.
*
* @return An {@code AutoOffsetReset} instance for the "byto_start_time" reset policy.
*/
public static AutoOffsetReset byStartTimetoStartTime() {
return new AutoOffsetReset(StrategyType.BYTO_START_TIME, Optional.empty());
}
} |
...
- Fetch the group creation timestamp: Call AdminClient.describeConsumerGroups()/AdminClient.describeStreamsGroups() and read groupCreationTimeMs() from the returned ConsumerGroupDescription/StreamsGroupDescription..
- If the value is OptionalLong.empty(), exit immediately with the following error:
- If the negotiated version does not support GroupCreationTimeMs (ConsumerGroupDescribe < 2 or StreamsGroupDescribe < 1): the broker is too old to support this feature. Error: Cannot reset offsets using --to-start-time. The broker does not support to_start_time (requires ConsumerGroupDescribe v2+ or StreamsGroupDescribe v1+). Please upgrade the broker.
- If the negotiated version supports GroupCreationTimeMs but the returned value is -1: the group was created before KIP-1282 was enabled.
- If the value is OptionalLong.empty(), exit immediately with the following error:
- Error: Cannot reset offsets using --to-start-time. The group creation time is not available for available for group '<group>'. The broker may not support group may have been created before KIP-1282 , or was enabled. Please delete and recreate the group predates the upgrade.
- If the value is OptionalLong.empty(), exit immediately with the following error:
- Issue ListOffsets: Use groupCreationTimeMs as the target timestamp and send a ListOffsetsRequest for all specified partitions, resolving the first offset at or after the group creation time for each partition.
- Commit offsets:
- With --execute: commit the resolved offsets.
- Without --execute (dry-run): print the resolved offsets without committing.
...
GroupCreationTimeNotAvailableException is thrown when byto_start_time is configured and the group creation timestamp is not available. Both cases use the same exception type, distinguished by the error message:
- If the negotiated protocol version does not support GroupCreationTimeMs (ConsumerGroupHeartbeat < 2 or StreamsGroupHeartbeat < 1): the broker is too old to support this this feature. Example message: "The broker does not support by
to_start_time(requires ConsumerGroupHeartbeat v2+ or StreamsGroupHeartbeat v1+)." - If the negotiated version supports GroupCreationTimeMs but groupCreationTimeMs == -1: the broker supports this feature, but the group was created before KIP-1282 was enabled and the creation time was never recorded. Example message: "The group creation time is not available for group 'my-group'. The group may have been created before KIP-1282 was enabled. Please delete and recreate the group."
This exception is intentionally distinct from NoOffsetForPartitionException, which signals the absence of a committed offset. GroupCreationTimeNotAvailableException signals a different root cause: the anchor timestamp required by byby to_start_time is is not available, making it impossible to safely determine the correct offset without risking data loss.
...
Upon throwing GroupCreationTimeNotAvailableException, the consumer does not attempt any automatic fallback. The exception is propagated to the caller's poll() invocation, leaving the recovery strategy to the user. This design is intentional — any automatic fallback without a known anchor timestamp risks silent data loss, which contradicts the safety guarantee that by to_start_time aims to provide.
Known Limitations
...
The group creation timestamp is recorded by the broker, while message timestamps may originate from producers on different machines. If there is clock drift between the broker and producers, the ListOffsetsRequest may return slightly imprecise results. However, this is an inherent limitation of any timestamp-based approach and is not unique to toby_start_time. In practice, moderate clock skew (within seconds) is unlikely to cause meaningful data loss.
...
If producers explicitly set custom timestamps on their records that do not reflect real time (e.g., event-time semantics with historical dates), the ListOffsetsRequest may return unexpected results. This is fundamentally a data quality issue that users are responsible for.
Key Difference Between by_duration and
...
to_start_time
The fundamental difference lies in when and how the reference timestamp is determined:
| by_duration | byto_start_time | |
|---|---|---|
| Timestamp Calculation | now() - duration | Group creation time (recorded by broker) |
| When Determined | Dynamically at partition discovery | Once when the group is first created |
| Consistency | Different timestamp per partition | Same timestamp for all partitions |
| Reference Point | Partition discovery time (client-side) | Group creation time (server-side) |
...
This KIP does not attempt to fix by_duration. Instead,by to_start_time is is introduced as a complementary policy with a stable, server-side reference point — the group creation time — making data-loss prevention a first-class guarantee rather than a side effect of a bug fix.
...
We discuss this improvement separately for different consumer types:
Async consumer
This is new feature, so it won't have backward compatibility issue.
different consumer types:
Modern consumer (KIP-848 consumer group protocol)/ Kafka Kafka Streams
This is new feature, so it won't have backward compatibility issue.
Classic consumer
This KIP does not add byadd to_start_time support support to classic consumer groups. If a user configures byto_start_time with a classic consumer group, the consumer will throw ConfigException at startup, clearly indicating that the policy is not available for classic consumer groups.
Classic consumer groups that are upgraded to modern consumer groups also cannot use byuse to_start_time immediately immediately, because the group creation timestamp was not recorded at the time the group was originally created. The consumer will throw GroupCreationTimeNotAvailableException in this case. Users who wish to use to use by_start_time after upgrading must delete and recreate the group so that the broker can record a fresh creation timestamp. This is the intended behavior — silently falling back to another policy would risk data loss, which contradicts the safety guarantee that bythat to_start_time aims aims to provide.
Share consumer
...
Future Work: Default Value Change
We believe bybelieve to_start_time better better matches what most users expect from a default offset reset policy. However, changing the default value is out of scope for this KIP. We intend to first ship byship to_start_time as an opt-in feature, gather community adoption experience and feedback, and then open a separate KIP to discuss changing the default value of auto.offset.reset in a future release.
...
All existing tests must continue to pass. New unit and integration tests will be added to cover:
- by
to_start_timeresolves resolves to the correct offset when no committed committed offset exists, when new partitions are added after group creation, and when committed offsets go out of range. - The consumer throws GroupCreationTimeNotAvailableException when the broker returns groupCreationTimeMs == -1, with no automatic fallback.
- Configuring byConfiguring
to_start_timewith with a classic consumer group throws ConfigException at startup. - A classic group upgraded to a modern group throws GroupCreationTimeNotAvailableException when by GroupCreationTimeNotAvailableException when
to_start_timeis is configured. - ConsumerGroupDescription.groupCreationTimeMs() returns the correct value for supported and unsupported brokers.
- CreationTimeMs is persisted once in ConsumerGroupMetadataValue and is not overwritten on subsequent epoch bumps.
- StreamsGroupDescription.groupCreationTimeMs() returns the correct value for supported and unsupported brokers.
AutoOffsetReset.byStartTimetoStartTime()correctly configures byconfiguresto_start_timefor per-source-topic overrides in Kafka Streams applications- kafka-consumer-groups.sh and kafka-streams-groups.sh --describe --state --verbose and --reset-offsets --to-start-time behave correctly for both supported and unsupported brokers.
...
- Configuration complexity: Introducing two additional config keys with interdependent fallback semantics increases cognitive overhead for users and makes misconfiguration more likely.by
to_start_timeachieves achieves the same goal with a single, self-explanatory value under the existingauto.offset.resetconfig.
...
- Static member crash: When a static member restarts, the consumer re-initializes with a new startup timestamp. Any records produced to newly-assigned partitions between the original startup and startup and the restart are silently skipped — defeating the purpose of byof
to_start_time. - Clock skew: The client clock may diverge from the broker clock, causing the timestamp-based ListOffsets lookup to return incorrect results.
...