DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/bp4zk31zr1sdxjsspg7b7bqddmm9t4gn
Vote Thread: https://lists.apache.org/thread/dgs3t9xmmldof5mmhwp21rpl3mfvpfw1
JIRA: KAFKA-20539 - Getting issue details... STATUS
Motivation
When a Kafka topic is expanded with new partitions, consumers configured with auto.offset.reset=latest will silently miss every record produced to those new partitions. This data loss occurs within the "metadata blindness window", the temporal gap between the partition's creation on the broker and the consumer's discovery of it during its next periodic metadata refresh.
This failure mode is entirely silent: no exceptions are thrown, no warnings are emitted, and no log entries are generated. For active, high-throughput topics, every partition expansion can result in data loss for consumer groups configured with auto.offset.reset=latest.
Because partition expansion is routine, this behavior represents a systemic source of silent data loss in production environments, this behavior represents a systemic and recurring source of silent data loss in production environments. In practice, topics are typically created with a conservative partition count to balance resource utilization and cost. As workloads grow, operators routinely increase partition counts to accommodate higher throughput. Consequently, nearly every long-lived, high-volume topic will undergo partition expansion at some point in its lifecycle. Under the current behavior, each such expansion creates another opportunity for consumer groups using auto.offset.reset=latest to silently miss records, making data loss an expected outcome of a common operational workflow rather than an isolated edge case.
The root cause is that today's auto.offset.reset applies a single policy uniformly to every partition without a committed offset, regardless of when that partition came into existence relative to the consumer group. From the user's perspective, however, two cases are fundamentally different:
Partitions that existed before the consumer group was created: The user's choice of
auto.offset.resetreflects an informed decision about how to handle the historical backlog of partitions that predate the group. When the user selectslatest, they have explicitly accepted that pre-group history will not be consumed.Partitions added to the topic after the consumer group was created: Every record on such a partition was produced during the group's active lifetime. Skipping these records with
latestis not an informed choice — it is silent data loss occurring within a window where the user expects continuous, gap-free consumption.
Today there is no way to express different reset behavior for these two cases. The existing auto.offset.reset is applied uniformly; neither latest, earliest, none, nor by_duration allows scoping the policy to one of these two categories. Workarounds at the application layer — such as custom ConsumerRebalanceListener.onPartitionsAssigned() logic — lack the temporal metadata required to reliably distinguish the two cases.
This KIP introduces a complementary configuration, auto.offset.reset.new.partitions, that decouples the policy for partitions added after group creation from the policy for partitions that predate it. The new configuration accepts the same value space as auto.offset.reset (earliest, latest, by_duration:<duration>), allowing users to express their actual intent — for example, auto.offset.reset=latest combined with auto.offset.reset.new.partitions=earliest ensures that pre-group history is skipped while no record on a newly expanded partition is silently lost.
The classification itself uses the consumer group's creation timestamp, recorded by the broker once when the group is first created and propagated to consumers via ConsumerGroupHeartbeatResponse. A partition whose creation time postdates the group creation time is classified as new; all other partitions are classified as pre-existing. This signal is server-side, deterministic, and does not require users to reason about client-side mechanics such as metadata refresh intervals.
Why Existing Alternatives are Insufficient
Why not Custom ConsumerRebalanceListener.onPartitionsAssigned() Logic
Lack of Temporal Metadata: The client currently has no way to determine when a partition was created. Partition creation time is completely absent from the existing
MetadataResponse—a protocol gap this KIP proposes to bridge. Without this timestamp, user code lacks a reliable signal to distinguish newly expanded partitions from pre-existing ones.Fragile Heuristics: Naive heuristics—such as "if this partition ID is previously unseen by this consumer instance, treat it as newly expanded"—fall apart under standard operational scenarios. Consumer restarts, group migrations, partition reassignments, and topic re-subscriptions all surface "unseen" partitions that are, in fact, pre-existing. A heuristic that mistakenly misclassifies pre-existing partitions as newly expanded would trigger the exact massive, unnecessary historical reprocessing we seek to avoid.
Framework Fragmentation: Preventing silent data loss during partition expansion is a correctness-critical default behavior. Leaving this to application-level workarounds forces every user and every downstream framework to reinvent and maintain this complex state independently. This logic fundamentally belongs in the core client.
Why not use by_duration?
The existing by_duration policy may appear to address partition expansion data loss, but it has several limitations that make it unsuitable:
- It computes the seek target client-side as
now() - duration, which introduces clock skew across consumers and forces operators to choose overly large durations, causing unnecessary reprocessing. - The target timestamp is recomputed on each retry, so failed
ListOffsetsRequestretries can shift the target forward and potentially miss records produced between attempts. - It applies uniformly to all partitions without committed offsets, and cannot distinguish newly expanded partitions from long-existing partitions newly assigned to the group, leading to unnecessary replay.
auto.offset.reset.new.partitions, combined with the group creation time classifier, addresses all three issues. Classification is computed entirely from two server-recorded timestamps — partition.creationTime (recorded once when the partition is created) and group.creationTime (recorded once when the consumer group is created) — eliminating any dependency on the consumer client's clock. Because both timestamps are fixed at the moment of creation and never updated thereafter, the classification result is deterministic and stable across retries. And because classification is per-partition, only genuinely newly expanded partitions are affected; pre-existing partitions newly assigned to the group continue to follow the base auto.offset.reset.
Furthermore, with the decoupled design introduced in this KIP, even users who choose auto.offset.reset=by_duration as their base policy benefit from auto.offset.reset.new.partitions: by setting auto.offset.reset.new.partitions=earliest (or any explicit value), newly expanded partitions are classified and handled separately from pre-existing ones. The base by_duration policy applies only to pre-existing partitions, where its seek-back semantics are appropriate, while newly expanded partitions follow the dedicated new-partition policy.
Public Interfaces
New Consumer Configuration
auto.offset.reset.new.partitions
| Property | Value |
|---|---|
| Type | String |
| Default | null |
| Valid Values | earliest, latest, by_duration:<duration> |
Specifies the offset reset policy to apply to newly expanded partitions, partitions whose creation timestamp on the broker postdates the consumer group's creation timestamp. When a partition has a committed offset that is out of the available range (e.g., due to log truncation), the consumer applies the base auto.offset.reset policy regardless of the partition's creation timestamp. This config only affects partitions with no committed offset at all.
- If a partition has a committed offset, this config has no effect; the consumer resumes from the committed offset as usual.
- If a partition has no committed offset and is classified as newly expanded (
partition.creationTime > group.creationTime), the consumer appliesauto.offset.reset.new.partitions. - If a partition has no committed offset and is classified as pre-existing (
partition.creationTime ≤ group.creationTime), the consumer applies the baseauto.offset.reset. - If either the partition creation timestamp or the group creation timestamp is unavailable (e.g., old broker, or the partition/group predates the upgrade), the consumer falls back to the base
auto.offset.resetfor that partition. No exception is thrown.
When this config is unset (default), the consumer applies the base auto.offset.reset uniformly to all partitions without committed offsets.
Admin client
AdminClient.describeConsumerGroups() now exposes the group creation timestamp. Callers can invoke groupCreationTimeMs() on the returned ConsumerGroupDescription to retrieve the epoch-milliseconds when the group was first created on the broker. This value is empty if the broker does not support KIP-1327 or the group predates the upgrade.
ConsumerGroupDescription gains a new method groupCreationTimeMs()
- groupCreationTimeMs() (OptionalLong): The creation time of the consumer group on the broker.
OptionalLong.empty()if unknown (e.g., older broker version, or group predates the upgrade).
AdminClient.describeTopics() now exposes the partition creation timestamp. Callers can invoke creationTimeMs() on each TopicPartitionInfo in the returned TopicDescription to retrieve the epoch-milliseconds when the partition was first created on the broker. This value is empty if the broker does not support KIP-1327 or the partition predates the upgrade.
TopicPartitionInfo gains a new method creationTimeMs()
- creationTimeMs() (OptionalLong): The creation time of the partition on the broker. OptionalLong.empty() if unknown (e.g., older broker version, or partition predates the upgrade).
kafka-topics.sh
kafka-topics.sh --describe adds a new CreationTimeMs field to the per-partition output line, showing the partition creation timestamp formatted as ISO-8601 UTC (yyyy-MM-dd'T'HH:mm:ss.SSSXXX). If the value is unavailable, the field displays N/A.
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic foo Topic: foo TopicId: abc123 PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: foo Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: N/A LastKnownElr: N/A CreationTimeMs: 2025-01-15T10:30:00.000Z Topic: foo Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: N/A LastKnownElr: N/A CreationTimeMs: N/A
kafka-consumer-groups.sh
--describe --state --verbose
Adds a new GROUP-CREATION-TIME column showing the group creation timestamp formatted as ISO-8601 UTC (yyyy-MM-dd'T'HH:mm:ss.SSSXXX). If the value is unavailable (e.g., older broker or group predates the KIP-1327 upgrade), the column displays N/A.
Example output:
./bin/kafka-consumer-groups.sh --describe --state --verbose GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE GROUP-EPOCH TARGET-ASSIGNMENT-EPOCH #MEMBERS GROUP-CREATION-TIME my-group broker-1:9092 (1) range Stable 5 5 3 2025-01-15T10:30:00.000Z legacy-group broker-2:9092 (2) range Stable 2 2 2 N/A
Protocol changes
ConsumerGroupHeartbeatRequest (version 2) no new fields will be added.
ConsumerGroupHeartbeatResponse (version 2) gains a new field:
- GroupCreationTimeMs (int64): The creation time of the consumer group on the broker. -1 if unknown.
ConsumerGroupDescribeRequest (version 2) no new fields will be added.
ConsumerGroupDescribeResponse (version 2) gains a new field in DescribedGroup:
- GroupCreationTimeMs (int64): The creation time of the consumer group on the broker. -1 if unknown.
DescribeTopicPartitionsRequest (version 1) no new fields will be added.
DescribeTopicPartitionsResponse (version 1)
- CreationTimeMs (int64): The creation time of the partition on the broker. -1 if unknown.
MetadataRequest (version 14) no new fields will be added.
MetadataResponse (version 14):
- CreationTimeMs (int64): The creation time of the partition on the broker. -1 if unknown.
Partition Record Changes
PartitionRecord (tagged field):
- CreationTimeMs (int64, default -1): recorded once when the partition is first created.
Consumer Offsets Record Changes
ConsumerGroupMetadataValue gains a new tagged field
- CreationTimeMs (int64, default -1): recorded once when the consumer group is first created.
Proposed Changes
Consumer Configuration
When auto.offset.reset.new.partitions is set to a non-null value, the consumer activates per-partition classification. For each partition assigned to the consumer that has no committed offset, the consumer compares:
- partition.creationTime: sourced from
MetadataResponse.PartitionMetadata.CreationTimeMs - group.creationTime: sourced from
ConsumerGroupHeartbeatResponse.GroupCreationTimeMs
If partition.creationTime > group.creationTime, the partition is classified as newly expanded, and the consumer applies the policy specified by auto.offset.reset.new.partitions. Otherwise, the consumer applies the base auto.offset.reset.
The full interaction matrix between the base policy and the new-partition policy is:
| Base auto.offset.reset | auto.offset.reset.new.partitions | Pre-existing partition | Newly expanded partition |
|---|---|---|---|
| latest | null | LEO | LEO |
| latest | earliest | LEO | offset 0 |
| latest | latest | LEO | LEO |
| latest | by_duration:5s | LEO | offset at now() - 5s |
| earliest | null | offset 0 | offset 0 |
| earliest | earliest | offset 0 | offset 0 |
| earliest | latest | offset 0 | LEO |
| earliest | by_duration:5s | offset 0 | offset at now() - 5s |
| by_duration:1h | null | offset at now() - 1h | offset at now() - 1h |
| by_duration:1h | earliest | offset at now() - 1h | offset 0 |
| by_duration:1h | latest | offset at now() - 1h | LEO |
| by_duration:1h | by_duration:5s | offset at now() - 1h | offset at now() - 5s |
| none | any value | NoOffsetForPartitionException | auto.offset.reset.new.partitions applied |
A note on by_duration as the new-partition policy: while the base by_duration policy has the limitations documented in Why Existing Alternatives are Insufficient, using it scoped to newly expanded partitions is substantially safer because the application is narrowed to a small set of genuinely new partitions, not to every partition assigned to the group without a committed offset.
Classification Flow
Classification happens client-side. When the consumer needs to resolve the initial offset for a partition, the flow is:
- Check committed offset. If
OffsetFetchResponsereturns a valid committed offset within the log range, use it. Classification does not occur. - Check out-of-range. If a committed offset exists but falls outside the log's available range (e.g., due to log truncation or
AdminClient.deleteRecords), the consumer applies the baseauto.offset.reset. Classification does not occur, since the partition is unambiguously pre-existing (it had a committed offset previously). - Check
auto.offset.reset.new.partitions. If unset (default), the consumer applies the baseauto.offset.reset. Classification does not occur. - Read timestamps.
group.creationTime— already available in the consumer, extracted fromConsumerGroupHeartbeatResponse.GroupCreationTimeMson every heartbeat and cached.partition.creationTime— extracted fromMetadataResponse.PartitionMetadata.CreationTimeMs. If the partition was not present in the most recent metadata fetch (typically the case for newly expanded partitions), the consumer forces a fresh metadata refresh; see the next section.
- Classify and apply.
- If either timestamp is
-1(unavailable), fall back to the baseauto.offset.resetfor that partition. No exception is thrown. - Otherwise, compare the two timestamps:
partition.creationTime > group.creationTime→ applyauto.offset.reset.new.partitions.partition.creationTime ≤ group.creationTime→ apply the baseauto.offset.reset.
- If either timestamp is
Classification is performed independently per partition. Within a single poll() cycle, some partitions may be classified as newly expanded and others as pre-existing, each resolving its initial offset according to the matching policy.
Metadata Refresh for Newly Assigned Partitions
When the consumer receives a heartbeat assignment containing partitions not present in the current metadata cache, it must obtain CreationTimeMs for those partitions before resolving the initial offset. The standard metadata cache cannot serve this need: even if the topic is already cached, a newly expanded partition did not exist at the time of the last metadata fetch and therefore has no cached CreationTimeMs.
To handle this:
MembershipManagerdetects when a heartbeat assignment includes partitions whoseCreationTimeMsis not in the current cache.- It invalidates the metadata for the affected topic and forces an immediate
MetadataRequest, regardless of whether the topic ID is already cached. - The offset reset decision for the new partitions is deferred until the fresh
MetadataResponseis received andCreationTimeMsis populated.
This forced refresh applies only when auto.offset.reset.new.partitions is set. When the config is unset, no refresh is triggered, preserving the existing assignment-reconciliation cost for users not opting into the feature.
Admin Client
ConsumerGroupDescription.groupCreationTimeMs() returns the epoch-milliseconds when the consumer group was first created on the broker, sourced from the GroupCreationTimeMs field in ConsumerGroupDescribeResponse.
The return value follows these rules:
- Returns a non-empty OptionalLong containing the group creation timestamp when the broker supports KIP-1327 and the group was created after the upgrade
- Returns OptionalLong.empty() when GroupCreationTimeMs == -1, which may occur in the following situations:
- The broker does not support KIP-1327 yet (e.g., older broker version)
- The group predates the upgrade and the creation time was never recorded
public class ConsumerGroupDescription {
/**
* The creation time of the consumer group on the broker, in milliseconds since epoch.
* Returns an OptionalLong.empty() if the broker does not support KIP-1327,
* or if the group predates the upgrade and the creation time was never recorded.
*/
public OptionalLong groupCreationTimeMs() { ... }
}
TopicPartitionInfo.creationTimeMs() returns the epoch-milliseconds when the partition was first created on the broker, sourced from the CreationTimeMs field in DescribeTopicPartitionsResponse. The return value follows these rules:
- Returns a non-empty OptionalLong containing the partition creation timestamp when the broker supports KIP-1327 and the partition was created after the upgrade.
- Returns OptionalLong.empty() when CreationTimeMs == -1, which may occur in the following situations:
- The broker does not support KIP-1327 yet.
- The partition predates the upgrade and the creation time was never recorded
public class TopicPartitionInfo {
/**
* The creation time of the partition on the broker, in milliseconds since epoch.
* Returns an OptionalLong.empty() if the broker does not support KIP-1327,
* or if the partition predates the upgrade and the creation time was never recorded.
*/
public OptionalLong creationTimeMs() { ... }
}
kafka-topics.sh --describe: Displaying CreationTimeMs
--describe fetches partition metadata via AdminClient.describeTopics() and calls creationTimeMs() on each TopicPartitionInfo in the returned TopicDescription:
- If the returned OptionalLong is non-empty and the value is a valid timestamp (≥ 0), the millisecond timestamp is formatted using SimpleDateFormat with pattern yyyy-MM-dd'T'HH:mm:ss.SSSXXX and appended as CreationTimeMs: <formatted-time> to the partition output line.
- When
TopicPartitionInfo.creationTimeMs()is empty (broker does not support KIP-1327, or the partition predates the upgrade), the field displays N/A.
kafka-consumer-groups.sh
--describe --state --verbose: Displaying GROUP-CREATION-TIME
--describe --state --verbose fetches group metadata via AdminClient.describeConsumerGroups() and calls groupCreationTimeMs() on the returned ConsumerGroupDescription:
- If the returned OptionalLong is non-empty, the millisecond timestamp is formatted using SimpleDateFormat with pattern yyyy-MM-dd'T'HH:mm:ss.SSSXXX (consistent with Utils.getDateTime(), which is the existing datetime format used by --to-datetime) and displayed in the GROUP-CREATION-TIME column.
- If the returned value is OptionalLong.empty(), the column displays "N/A".
Protocol Changes
ConsumerGroupHeartbeatResponse bumps from version 1 to version 2. Version 2 adds GroupCreationTimeMs (int64): the creation time when this consumer group was first created on the broker. -1 if unknown.
{
"apiKey": 68,
"type": "response",
"name": "ConsumerGroupHeartbeatResponse",
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
// skip
{ "name": "GroupCreationTimeMs", "versions": "2+", "type": "int64", "default": -1, "ignorable": true,
"about": "The creation time of the consumer group." },
{ "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "null if not provided; the assignment otherwise.", "fields": [
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
"about": "The partitions assigned to the member that can be used immediately." }
]}
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic ID." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]}
]
}
ConsumerGroupDescribeResponse bumps from version 1 to version 2. Version 2 adds GroupCreationTimeMs (int64) in DescribedGroup: the creation time when this consumer group was first created on the broker. -1 if unknown (e.g., old broker, or group predates this feature).
{
"apiKey": 69,
"type": "response",
"name": "ConsumerGroupDescribeResponse",
// Version 1 adds MemberType field (KIP-1099).
// Version 2 adds GroupCreationTimeMs field (KIP-1327).
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
"about": "Each described group.",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
// skip
{ "name": "GroupCreationTimeMs", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true,
"about": "The creation time of the consumer group. -1 if unknown." },
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }
]
}
],
// skip
}
MetadataResponse bumps from version 13 to version 14. The PartitionMetadata struct gains a new field CreationTimeMs, the creation time of the partition on the broker. -1 if unknown.
{
"apiKey": 3,
"type": "response",
"name": "MetadataResponse",
"validVersions": "0-14",
"flexibleVersions": "9+",
"fields": [
{
"name": "Topics",
"type": "[]MetadataResponseTopic",
"versions": "0+",
"fields": [
{
"name": "Partitions",
"type": "[]MetadataResponsePartition",
"versions": "0+",
"fields": [
// ... existing fields unchanged ...
{
"name": "CreationTimeMs", "type": "int64", "versions": "14+", "taggedVersions": "14+", "tag": 0, "default": "-1", "ignorable": true,
"about": "The creation time of the partition. -1 if unknown."
}
]
}
]
}
]
}
DescribeTopicPartitionsResponse bumps from version 0 to version 1. Version 1 adds CreationTimeMs as a tagged field inside DescribeTopicPartitionsResponsePartition, populated by the broker from PartitionRecord.CreationTimeMs:
{
"apiKey": 75,
"type": "response",
"name": "DescribeTopicPartitionsResponse",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+",
"about": "Each topic in the response.", "fields": [
// skip
{ "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
"about": "Each partition in the topic.", "fields": [
{ "name": "CreationTimeMs", "type": "int64", "versions": "1+",
"taggedVersions": "1+", "tag": 0, "default": "-1", "ignorable": true,
"about": "The time in milliseconds when this partition was first created. -1 if unknown." },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the leader broker." },
// skip
}
]
}
Partition Record Changes
PartitionRecord gains a new tagged field, CreationTimeMs. This timestamp is persisted exactly once by the controller when the partition is initially created and is never updated thereafter; it defaults to -1 (indicating the creation time is unknown). By leveraging a tagged field (tag: 3), this change remains backward compatible and does not require bumping the PartitionRecord serialization version. Existing brokers that do not recognize this tag will safely ignore and skip it, while upgraded brokers will populate it for all newly expanded partitions going forward.
{
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
// Version 1 adds Directories for KIP-858.
// Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
// ... existing fields unchanged ...
{ "name": "CreationTimeMs", "type": "int64", "versions": "0+",
"taggedVersions": "0+", "tag": 3, "default": "-1",
"about": "The time in milliseconds when this partition was first created. -1 if unknown." }
]
}
Consumer Offsets Record Changes
ConsumerGroupMetadataValue gains a new tagged field CreationTimeMs (int64, default -1): recorded once when the consumer group is first created.
{
"apiKey": 3,
"type": "coordinator-value",
"name": "ConsumerGroupMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." },
// The MetadataHash is added in 4.1 (KIP-1101). It's used to track
// subscribed topics in the group. When subscribed topics change,
// like partition count or rack change, the hash will be different.
// It indicates that the group should be rebalanced.
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"default": 0, "taggedVersions": "0+", "tag": 0,
"about": "The hash of all topics in the group." },
{ "name": "CreationTimeMs", "versions": "0+", "type": "int64",
"default": -1, "taggedVersions": "0+", "tag": 1,
"about": "The creation time of the consumer group." }
]
}
Compatibility, Deprecation, and Migration Plan
We discuss this improvement separately for different consumer types:
Modern consumer (KIP-848)
This is a new, opt-in feature. The config auto.offset.reset.new.partitions defaults to null (unset), so existing deployments are unaffected — the consumer continues to apply the base auto.offset.reset uniformly to all partitions without committed offsets. There is no behavior change for any current user until the config is explicitly set.
When the config is set on a cluster that has not yet been upgraded to support partition / group creation timestamps, classification cannot occur because the required fields are absent or -1. In this case, the consumer falls back to the base auto.offset.reset for the affected partitions. No exception is thrown, and no operational disruption results. The feature becomes effective gradually as partitions and groups created after the upgrade accumulate.
Classic consumer
This KIP does not add auto.offset.reset.new.partitions support to classic consumer groups. If a user configures this option while using a classic consumer group, the consumer will throw a ConfigException at startup, clearly indicating that the feature requires the modern consumer group protocol.
Kafka Streams
This KIP does not introduce changes to the Streams rebalance protocol.
The partition expansion data loss problem this KIP addresses is specific to consumers using the modern consumer group protocol (KIP-848). Kafka Streams uses its own assignment protocol and is out of scope for this KIP; support for Streams may be considered in a future KIP.
To prevent silent misconfiguration, auto.offset.reset.new.partitions is added to StreamsConfig.NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS. If a user sets auto.offset.reset.new.partitions in a Streams application, the Streams client will throw a ConfigException.
Share consumer
This KIP does not include changes for share consumer groups. Share groups do not require this feature because the "latest" policy already starts at offset 0 for newly added partitions in share groups, avoiding the partition-expansion data-loss scenario that this KIP addresses.
Future default consideration (Kafka 5.0)
For backward compatibility, this KIP defaults auto.offset.reset.new.partitions to null, meaning the consumer applies the base auto.offset.reset uniformly and partition-expansion protection is opt-in.
A future KIP may revisit this default in a major release such as Kafka 5.0, changing it to earliest so that partition-expansion safety becomes the out-of-the-box behavior. The rationale is as follows:
- Silent data loss is the current default failure mode. Users running
auto.offset.reset=latest(the most common base policy) silently miss records on every partition expansion unless protection has been explicitly enabled. A safe default eliminates this footgun. earliestaligns with user intent in nearly all cases. A user who selectslatestis typically expressing "I don't want to reprocess history that already existed when I started." Newly expanded partitions carry no such history — every record on them was produced during the consumer group's active lifetime, so reading from the beginning is consistent with that same intent.- Opt-out is straightforward. Users who prefer the previous uniform behavior can set
auto.offset.reset.new.partitions=latest(or any value matching their base policy), which is semantically a no-op overlay on the base policy.
Test Plan
All existing tests must continue to pass. New tests will cover:
- Classification logic:
partition.creationTime > group.creationTimetriggersauto.offset.reset.new.partitions; otherwise the baseauto.offset.resetapplies. - Fallback: When
group.creationTime == -1orpartition.creationTime == -1, the consumer falls back to the base policy without throwing. - Each base × new.partitions combination (
latest/earliest/by_duration/none×earliest/latest/by_duration) resolves to the expected offset. - Out-of-range offsets apply the base policy, bypassing classification.
- Config validation: Setting
auto.offset.reset.new.partitionswithgroup.protocol=classicor in Kafka Streams throwsConfigExceptionat startup. - Persistence:
ConsumerGroupMetadataValue.CreationTimeMsandPartitionRecord.CreationTimeMsare written once and never overwritten across epoch bumps, leader changes, ISR changes, or reassignments. - Admin client:
ConsumerGroupDescription.groupCreationTimeMs()andTopicPartitionInfo.creationTimeMs()return correct values for supported brokers, andOptionalLong.empty()for unsupported brokers. - CLI:
kafka-topics.sh --describeandkafka-consumer-groups.sh --describe --state --verbosecorrectly displayCreationTimeMsandGROUP-CREATION-TIME, orN/Awhen unavailable.
Rejected Alternatives
New auto.offset.reset=to_start_time policy (KIP-1282)
The controller records a group creation timestamp (GroupCreationTimeMs) when a consumer group is first created. This timestamp is returned to the consumer via the ConsumerGroupHeartbeatResponse.
When auto.offset.reset=to_start_time and the consumer encounters a partition with no committed offset or an out-of-range offset, it issues a ListOffsetsRequest using the group creation timestamp as the target. The broker returns the earliest offset at or after that timestamp. This produces the following behavior:
- New group, first start: the group creation timestamp is approximately "now", so
ListOffsetsresolves to the log end offset, equivalent tolatest. The consumer skips the historical backlog. - Partition expansion: the group creation timestamp predates the new partition, so
ListOffsetsresolves to offset 0. The consumer reads all records from the new partition, preventing data loss. - Out-of-range (stale offset, log truncation):
ListOffsetsresolves to the earliest surviving offset at or after the group creation time, effectively falling back toearliestfor the group's lifetime.
Why reject
- No consensus on out-of-range semantics: The community could not agree on how
to_start_timeshould behave for out-of-range offsets. The main issue is whether it should remain consistent with latest (skip backlog) or fall back to earliest (ensure completeness).to_start_timeapplies different behaviors in these cases, which some see as inconsistent. A proposed “Smarter Latest” variant was rejected for introducing branching logic and violating the single-rule model. - Overkill for the core problem: The goal—preventing data loss during partition expansion—does not require a new reset policy with additional semantics. A simpler approach, such as refining latest via configuration, would be more focused and easier to adopt. Introducing a new policy was considered disproportionate to the problem.
auto.offset.reset.max.age.ms based on partition age
Proposed classifying partitions using a different signal: partition age, computed server-side by the broker as broker_current_time partition_creation_time and returned via MetadataResponse.PartitionAgeMs. A new consumer config, auto.offset.reset.max.age.ms, would compare this age against a user-set threshold:
- partition_age ≤ threshold → newly expanded → apply a safe policy (auto.offset.reset.new.partitions)
- partition_age > threshold → pre-existing → apply the base auto.offset.reset
Why rejected:
- Threshold tuning leaks implementation details: The correct threshold value is determined by the consumer's metadata refresh delay, which is an internal client-side concern. Users would be forced to reason about internal mechanics, refresh intervals, retry behavior simply to set the config correctly.
- Boundary instability: A partition near the threshold transitions from "new" to "pre-existing" between successive metadata fetches as the broker's clock advances. Classification can flip based purely on the timing of the next metadata refresh, with no corresponding user action.
- Group creation time is the more intuitive classifier: users naturally reason about "new vs. existing partition"relative to the lifetime of their consumer group, not relative to broker wall-clock. The current design adopts this framing directly.