DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Under Discussion"
Discussion thread: https://lists.apache.org/thread/bp4zk31zr1sdxjsspg7b7bqddmm9t4gn
JIRA: KAFKA-20539 - Getting issue details... STATUS
Motivation
When a Kafka topic is expanded with new partitions, consumers using the latest auto offset reset policy will silently miss all records produced to those partitions before the consumer discovers them. This is a long-standing data loss issue that is particularly painful for active, "hot" partitions — partitions that are newly created and receiving records at the time of discovery.
The current workaround is to use auto.offset.reset=earliest, but this forces the consumer to reprocess the entire historical backlog, which is unacceptable for most production pipelines. The existing by_duration policy is also insufficient because:
- The calculated seek time (
now() - duration) varies across nodes due to clock skew. To be safe, users must set an overly large duration, causing unnecessary reprocessing. - On network errors, the client recalculates the seek time on retry, shifting the target timestamp forward and risking data loss.
The core insight is that not all partitions without a committed offset are the same. A newly expanded partition (hot) is fundamentally different from a partition the consumer has never seen because it predates the group (cold). The latest policy today treats both the same, which causes silent data loss in the hot case.
This KIP introduces a new consumer configuration auto.offset.reset.latest.max.age that allows the latest policy to distinguish between hot and cold partitions, falling back to earliest only for recently created partitions within a user-defined age threshold.
Public Interfaces
New Consumer Configuration
auto.offset.reset.latest.max.age
| Property | Value |
|---|---|
| Type | long |
| Default | -1 (disabled) |
| Unit | milliseconds |
| Validator | atLeast(-1) |
| Applies | only when auto.offset.reset=latest |
When set, this threshold controls the behavior of the latest policy when a consumer encounters a partition with no committed offset:
- If the partition's age (
PartitionAgeMsfromMetadataResponse, computed server-side by the broker asbroker_current_time - partition_creation_time) is within the threshold → fall back toearliestto capture hot data. - If the partition's age exceeds the threshold, or if the partition creation time is unavailable (e.g., older broker) → strictly follow
latest.
Admin client
AdminClient.describeTopics() now exposes the partition creation timestamp. Callers can invoke creationTimeMs() on each TopicPartitionInfo in the returned TopicDescription to retrieve the epoch-milliseconds when the partition was first created on the broker. This value is empty if the broker does not support KIP-1327 or the partition predates the upgrade.
TopicPartitionInfo gains a new method creationTimeMs()
- creationTimeMs() (OptionalLong): The creation time of the partition on the broker. OptionalLong.empty() if unknown (e.g., older broker version, or partition predates the upgrade).
kafka-topics.sh
kafka-topics.sh --describe adds a new CreationTimeMs field to the per-partition output line, showing the partition creation timestamp formatted as ISO-8601 UTC (yyyy-MM-dd'T'HH:mm:ss.SSSXXX). If the value is unavailable, the field displays -. If the broker does not support DescribeTopicPartitionsResponse v1 at all, the field is omitted entirely.
./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic foo Topic: foo TopicId: abc123 PartitionCount: 2 ReplicationFactor: 2 Configs: Topic: foo Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: N/A LastKnownElr: N/A CreationTimeMs: 2025-01-15T10:30:00.000Z Topic: foo Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: N/A LastKnownElr: N/A CreationTimeMs: -
Protocol changes
MetadataRequest (version 14) no new fields will be added.
MetadataResponse (version 14):
- PartitionAgeMs (int64, default -1): The age of this partition in milliseconds, computed server-side by the broker as
broker_current_time - partition_creation_time. Returns -1 if the broker does not support this feature or the partition creation time is unknown.
DescribeTopicPartitionsRequest (version 1) no new fields will be added.
DescribeTopicPartitionsResponse (version 1)
- CreationTimeMs (int64[]) as a field inside DescribeTopicPartitionsResponsePartition, populated by the broker from PartitionRecord.CreationTimeMs
Partition Record Changes
PartitionRecord (tagged field):
- CreationTimeMs (int64, default -1): recorded once when the partition is first created.
Proposed Changes
Consumer Configuration
auto.offset.reset.latest.max.age extends the behavior of the latest reset policy to handle newly expanded partitions without silent data loss.
When this config is set to a positive value (in milliseconds), the consumer reads PartitionAgeMs directly from the MetadataResponse, which is already fetched during assignment reconciliation as part of the normal KIP-848 flow. The age is computed server-side by the broker as broker_current_time - partition_creation_time, so no client-side time computation is required. This age is then compared against the configured threshold to classify each partition with no committed offset as either hot or cold:
- Hot partition (
partitionAge <= auto.offset.reset.latest.max.age): the partition was recently created or expanded. The consumer resolves the initial fetch position to the start offset for that partition - Cold partition (
partitionAge > auto.offset.reset.latest.max.age): the partition predates the threshold. The consumer strictly followslatestand skips the historical backlog, which is the existing behavior. - Age unavailable (
partitionAge == -1): the broker does not support this feature, or the partition was created before the cluster was upgraded to the required metadata version. The consumer falls back tolatest, preserving full backward compatibility. No exception is thrown.
| Scenario | Partition Age | Behavior |
|---|---|---|
| Newly expanded partition (hot) | ≤ max.age | earliest |
| Pre-existing partition, consumer group is new | > max.age | latest |
| Out-of-range offset | N/A | latest |
| Age unavailable (old broker or partition predates feature) | -1 | latest |
This config is ignored when auto.offset.reset is set to any value other than latest. It is also only supported with the modern consumer group protocol (group.protocol=consumer). Configuring it alongside the classic protocol will throw a ConfigException at startup.
Metadata Refresh Flow for Newly Assigned Partitions
When the consumer receives a new partition assignment via HeartbeatResponse, it must obtain a fresh PartitionAgeMs before making the offset reset decision. The standard metadata cache cannot be used directly: even if the topic is already known, the newly expanded partition did not exist at the time of the last fetch and therefore has no cached PartitionAgeMs.
To handle this, when MembershipManager detects that a heartbeat assignment contains partitions not previously assigned, it explicitly invalidates the metadata for the affected topic and forces an immediate MetadataRequest, regardless of whether the topic ID is already in the cache. The offset reset decision is deferred until the fresh MetadataResponse is received and the PartitionAgeMs for the new partition is available.
Admin Client
TopicPartitionInfo.creationTimeMs() returns the epoch-milliseconds when the partition was first created on the broker, sourced from the CreationTimeMs field in DescribeTopicPartitionsResponse. The return value follows these rules:
- Returns a non-empty OptionalLong containing the partition creation timestamp when the broker supports KIP-1327 and the partition was created after the upgrade.
- Returns OptionalLong.empty() when CreationTimeMs == -1, which may occur in the following situations:
- The broker does not support KIP-1327 yet (e.g., older broker version, client falls back to MetadataResponse)
- The partition predates the upgrade and the creation time was never recorded
public class TopicPartitionInfo {
/**
* The creation time of the partition on the broker, in milliseconds since epoch.
* Returns an OptionalLong.empty() if the broker does not support KIP-1327,
* or if the partition predates the upgrade and the creation time was never recorded.
*/
public OptionalLong creationTimeMs() { ... }
}
kafka-topics.sh --describe: Displaying CreationTimeMs
--describe fetches partition metadata via AdminClient.describeTopics() and calls creationTimeMs() on each TopicPartitionInfo in the returned TopicDescription:
- If the returned OptionalLong is non-empty and the value is a valid timestamp (≥ 0), the millisecond timestamp is formatted using SimpleDateFormat with pattern yyyy-MM-dd'T'HH:mm:ss.SSSXXX and appended as CreationTimeMs: <formatted-time> to the partition output line.
- If the returned OptionalLong is non-empty but the value is -1, the field is appended as CreationTimeMs: -.
- If the returned value is OptionalLong.empty(), the CreationTimeMs field is not appended at all.
Protocol Changes
MetadataResponse bumps from version 13 to version 14. The PartitionMetadata struct gains a new tagged field PartitionAgeMs, computed server-side by the broker as broker_current_time - PartitionRecord.CreationTimeMs:
{
"apiKey": 3,
"type": "response",
"name": "MetadataResponse",
"validVersions": "0-14",
"flexibleVersions": "9+",
"fields": [
{
"name": "Topics",
"type": "[]MetadataResponseTopic",
"versions": "0+",
"fields": [
{
"name": "Partitions",
"type": "[]MetadataResponsePartition",
"versions": "0+",
"fields": [
// ... existing fields unchanged ...
{
"name": "PartitionAgeMs", "type": "int64", "versions": "14+", "taggedVersions": "14+", "tag": 0, "default": "-1", "ignorable": true,
"about": "The age of this partition in milliseconds, computed as broker_current_time - partition_creation_time. -1 if the partition creation time is unknown (partition predates the required MetadataVersion or broker does not support this feature)."
}
]
}
]
}
]
}
DescribeTopicPartitionsResponse bumps from version 0 to version 1. Version 1 adds CreationTimeMs as a tagged field inside DescribeTopicPartitionsResponsePartition, populated by the broker from PartitionRecord.CreationTimeMs:
{
"apiKey": 75,
"type": "response",
"name": "DescribeTopicPartitionsResponse",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+",
"about": "Each topic in the response.", "fields": [
// skip
{ "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
"about": "Each partition in the topic.", "fields": [
{ "name": "CreationTimeMs", "type": "int64", "versions": "1+",
"taggedVersions": "1+", "tag": 0, "default": "-1", "ignorable": true,
"about": "The time in milliseconds when this partition was first created. -1 if unknown (partition predates the required MetadataVersion)." },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the leader broker." },
// skip
}
]
}
Partition Record Changes
PartitionRecord gains a new tagged field CreationTimeMs. It is written once by the controller when the partition is first created and never updated thereafter, this field defaults to -1 (unknown), and the broker will return -1 as the CreationTimeMs for those partitions in MetadataResponse.
{
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
// Version 1 adds Directories for KIP-858.
// Version 2 implements Eligible Leader Replicas and LastKnownElr as described in KIP-966.
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
// ... existing fields unchanged ...
{ "name": "CreationTimeMs", "type": "int64", "versions": "0+",
"taggedVersions": "0+", "tag": 3, "default": "-1",
"about": "The time in milliseconds when this partition was first created. -1 if unknown." }
]
}
Why not use by_duration?
The existing by_duration policy may appear to address partition expansion data loss, but it has several limitations that make it unsuitable:
- It computes the seek target client-side as
now() - duration, which introduces clock skew across consumers and forces operators to choose overly large durations, causing unnecessary reprocessing. - The target timestamp is recomputed on each retry, so failed
ListOffsetsRequestretries can shift the target forward and potentially miss records produced between attempts. - It applies uniformly to all partitions without committed offsets, and cannot distinguish newly created (hot) partitions from long-existing (cold) partitions newly assigned to the group, leading to unnecessary replay.
auto.offset.reset.latest.max.age addresses all three issues. Partition age is computed server-side by the broker as broker_current_time - partition_creation_time and returned via MetadataResponse, eliminating any dependency on the consumer client's clock. The seek target itself — earliest or latest — is a stable sentinel value and does not drift across retries, unlike by_duration where the ListOffsets target timestamp shifts forward on each retry.
Partition age does increase over time as the broker recomputes it on each MetadataResponse, which means a partition near the threshold boundary may transition from "hot" to "cold" between metadata fetches. In practice this is benign: once a partition exceeds the threshold it is no longer newly created, and latest is the intended fallback. Users should configure the threshold with sufficient margin above their expected partition discovery latency. Because age is tracked per partition, only genuinely young partitions fall back to earliest while cold partitions continue to follow latest.
Compatibility, Deprecation, and Migration Plan
We discuss this improvement separately for different consumer types:
Modern consumer (KIP-848 consumer group protocol)
This is a new feature. The config auto.offset.reset.latest.max.age defaults to -1 (disabled), so existing deployments that do not enable it are unaffected. There is no behavior change for current latest users.
When enabled on a cluster that does not yet support partition timestamps, partitions will not have a recorded creation time (CreationTimeMs = -1). In this case, the consumer receives -1 as the partition age and falls back to latest, preserving existing behavior. No migration is required.
The feature becomes effective gradually as new partitions are created after the upgrade.
Consumer/Broker behaviour Matrix
| Consumer | Broker | auto.offset.reset.latest.max.age | Metadata Response | Behavior |
|---|---|---|---|---|
| New | New | non -1 | age known | earliest (hot), latest (cold) |
| New | New | non -1 | -1 (partition predates upgrade) | latest |
| New | New | -1 (disabled) | ignored | latest |
| New | Old (MetadataResponse v0–13) | non -1 | field absent | UnsupportedVersionException |
| New | Old (MetadataResponse v0–13) | -1 (disabled) | field absent | latest |
When auto.offset.reset.latest.max.age is configured and the broker does not support MetadataResponse v14, the feature cannot function correctly — silently falling back to latest would defeat the purpose of the config and risk data loss without any indication to the operator. Therefore, the consumer throws an UnsupportedVersionException at runtime when API version negotiation determines that the broker's maximum supported MetadataResponse version is below 14 and auto.offset.reset.latest.max.age is set to a non -1 value.
Classic consumer
This KIP does not add auto.offset.reset.latest.max.age support to classic consumer groups. If a user configures this option while using a classic consumer group, the consumer will throw a ConfigException at startup, clearly indicating that the feature requires the modern consumer group protocol.
Kafka Streams
This KIP does not introduce changes to the Streams rebalance protocol.
This KIP does not introduce changes to the Streams rebalance protocol. The partition expansion data loss problem this KIP addresses is specific to consumers using the modern consumer group protocol (KIP-848). Kafka Streams uses its own assignment protocol and is out of scope for this KIP; support for Streams may be considered in a future KIP.
To prevent silent misconfiguration, auto.offset.reset.latest.max.age is added to StreamsConfig.NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS. If a user sets this config in a Streams application, the Streams client will throw a ConfigException.
Share consumer
This KIP does not include changes for share consumer groups. Share groups do not require this feature because the "latest" policy already starts at offset 0 for newly added partitions in share groups, avoiding the partition-expansion data-loss scenario that this KIP addresses.
Future default consideration (Kafka 5.0)
This KIP defaults auto.offset.reset.latest.max.age to -1 (disabled) for backward compatibility. However, a future KIP may consider changing the default to a positive value (e.g., 3600000 ms / 1 hour) in a major release such as Kafka 5.0. The rationale is that preventing silent data loss during partition expansion should be safe-by-default behavior. A 1-hour window provides generous safety margin: partition expansion followed by consumer discovery typically takes seconds to minutes. Users who explicitly want pure latest behavior for all partitions — including newly created ones — can set auto.offset.reset.latest.max.age=-1 to disable the feature.
Test Plan
- Unit tests: Verify the age comparison logic for all combinations of partition age and threshold value, including boundary conditions and unknown age.
- Integration tests: Verify that a consumer correctly seeks to
earliestfor hot partitions andlatestfor cold partitions after topic expansion, with and without the config set.
Rejected Alternatives
New auto.offset.reset=to_start_time policy (KIP-1282)
The controller records a group creation timestamp (GroupCreationTimeMs) when a consumer group is first created. This timestamp is returned to the consumer via the ConsumerGroupHeartbeatResponse.
When auto.offset.reset=to_start_time and the consumer encounters a partition with no committed offset or an out-of-range offset, it issues a ListOffsetsRequest using the group creation timestamp as the target. The broker returns the earliest offset at or after that timestamp. This produces the following behavior:
- New group, first start: the group creation timestamp is approximately "now", so
ListOffsetsresolves to the log end offset, equivalent tolatest. The consumer skips the historical backlog. - Partition expansion (hot partition): the group creation timestamp predates the new partition, so
ListOffsetsresolves to offset 0. The consumer reads all records from the new partition, preventing data loss. - Out-of-range (stale offset, log truncation):
ListOffsetsresolves to the earliest surviving offset at or after the group creation time, effectively falling back toearliestfor the group's lifetime.
Why reject
- No consensus on out-of-range semantics: The community could not agree on how
to_start_timeshould behave for out-of-range offsets. The main issue is whether it should remain consistent with latest (skip backlog) or fall back to earliest (ensure completeness).to_start_timeapplies different behaviors in these cases, which some see as inconsistent. A proposed “Smarter Latest” variant was rejected for introducing branching logic and violating the single-rule model. - Overkill for the core problem: The goal—preventing data loss during partition expansion—does not require a new reset policy with additional semantics. A simpler approach, such as refining latest via configuration, would be more focused and easier to adopt. Introducing a new policy was considered disproportionate to the problem.
