DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: ["Under Discussion"]
Discussion thread: here
JIRA:
KAFKA-20033
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Background: What is __remote_log_metadata and how is it used?
Tiered storage: When local segments close, brokers upload them to remote storage (S3/Azure/GCS).
remote_log_metadata: An internal topic used by TopicBasedRemoteLogMetadataManager to record remote-segment lifecycle events per topic-partition (COPY* and DELETE*), including start/end offsets, leader epoch and timestamps.
Why it matters: Brokers maintain an in-memory RemoteLogMetadataCache to answer “which segment contains offset X?” and “what is the highest remote offset?” This cache drives remote fetches and cleanup.
How state is built today: On startup/leadership changes, brokers consume the relevant metadata partitions from earliest available offset, replay all events, and rebuild the cache (apply STARTED → FINISHED → optional DELETE states; index by endOffset and leaderEpoch). Because the topic is append-only, full replay is required to compute current state.
Operational challenges
Problem 1: Unbounded Growth
cleanup.policy=delete with infinite retention (-1)
All lifecycle events retained forever
~1.2M metadata records/year at 1 TB/day upload, topic can grow out of bounds, stressing I/O and page cache
Problem 2: No Cleanup Mechanism
Metadata persists after remote segments expire
No removal for segments already deleted in remote storage
Orphaned records from failed/retried uploads accumulate indefinitely
Problem 3: Slow Bootstrap
Full replay from offset 0 to rebuild cache
Bootstrap time grows with topic size; restarts and leadership changes take increasingly long time
Problem 4: Audit Trail vs Operational Efficiency
Design favors complete audit history
No option to prefer bounded state and fast bootstrap
Cannot trade audit completeness for performance today
This KIP aims to solve this issue through introducing another compacted topic for the brokers to bootstrap the state from, while also preserves the whole remote log metadata audit history through the existing topic __remote_log_metadata.
Goals
Enable bounded metadata storage to support faster bootstrap for the broker restart/leadership changes: the number of messages in the remote metadata topic is proportional to active remote segments, not the total history.
Support metadata cleanup: remove metadata when segments expire from remote storage.
Preserve audit trail in separate topic: keep full lifecycle history per segment (all state transitions).
Non-Goals
Wire-level backward compatibility for keys in the compacted projection (breaking change there is acceptable).
Online migration with zero operational steps: clusters that have the tiered storage enabled before must go through an automated migration to adopt the compacted projection change, clusters that don’t have the tiered storage enabled can adopt the change directly without any migration.
Support for other RLMM implementations is not considered: focus is on TopicBasedRemoteLogMetadataManager.
Proposed Changes
Topics and Configurations
This KIP updates the behavior and usage of the existing internal topic__remote_log_metadata, the current default configuration for this topic is:cleanup.policy=delete
retention.ms=-1
With this change, the topic's configuration can be updated to be compacted:cleanup.policy=compact
min.cleanable.dirty.ratio=0.1
delete.retention.ms=86400000 // 1 day
segment.ms=3600000 // 1 hour
Once the topic becomes compacted, it will only has bounded number of messages for cache bootstrap and steady-state reads; supports tombstones
Message Key and Value
All metadata records in __remote_log_metadata become keyed. The value carries the detailed metadata; the key defines the uniqueness and compaction behavior.Key format: TopicIdPartition:EndOffset:BrokerLeaderEpoch
Example: "T8fJ9Kz3RyWxP2mQ4nL7vB:0:1999:5"
Components:
TopicIdPartition: unique UUID to represent which topic and which partition
endOffset: last offset in the segment (deterministic since tiered storage only uploads closed segment)
brokerLeaderEpoch: the leader epoch of the broker that owned the topic-partition during the upload. It disambiguates concurrent/retry cases. Since a single broker attempts to upload a given segment only once at a time, only one writer will try to write the value anytime.
Properties:
For the same segment under the same leader epoch, retries or updates share the same key (compaction keeps the latest state).
For the same segment under different leader epochs, records use different keys, allowing the system to distinguish concurrent or overlapping uploads across different brokers.
For partition-level delete metadata, the same key structure is used, with an endOffset that represents the partition end offset at deletion time.
Broker Write Path
The broker continues to write lifecycle events for remote log segments and partitions, but now with:Deterministic keys.
Tombstone semantics once the log segment is expired.
Lifecycle events
Segment upload:
COPY_SEGMENT_STARTED
COPY_SEGMENT_FINISHED
Segment deletion:
DELETE_SEGMENT_STARTED
DELETE_SEGMENT_FINISHED
Partition deletion:
DELETE_PARTITION_STARTED
DELETE_PARTITION_FINISHED
It will also receive the tombstone messages for those expired metadata messages, expired metadata messages are defined as the messages that belong to the log segments that are already in the DELETE_SEGMENT_FINISHED states.
A series of tombstone messages will be published for all the keys that share the same prefix topicIdPartition:endOffset and keys that have no bigger brokerLeaderEpoch than the current one. While the original storeRemoteLogMetadata() method will wait for the consumer of __remote_log_metadata to finish the consumption of the latest DELETE_SEGMENT_FINISHED message, in order to minimize the impact for the performance, for the tombstone messages, the consumers will finish the consumption asynchronously. And also, to guarantee the expired messages are tomb-stoned, a background cleanup thread executed by RemoteMetadataCleanupManager will scan the __remote_log_metadata periodically to check whether there are orphaned messages that can be deleted.
4. Broker Read Path
There is no change in how the ConsumerTask read the messages and build the state in TopicBasedRemoteLogMetadataManager since there is not much meaningful value format change.
There is no change to RemoteLogMetadataTopicPartitioner neither.
User Scenario
RemoteLogSegmentState Enum Values
Value | State | Description |
|---|---|---|
0 | COPY_SEGMENT_STARTED | Upload in progress |
1 | COPY_SEGMENT_FINISHED | Upload completed successfully |
2 | DELETE_SEGMENT_STARTED | Deletion in progress |
3 | DELETE_SEGMENT_FINISHED | Deletion completed |
Scenario 1: Normal Segment Upload
Context: Topic orders (ID: abc123 )
Partition 0, Broker 101 at epoch 3
Timeline & Messages
Time | Event | Message Key in the Compacted Topic | Value |
|---|---|---|---|
T1 | Start upload | abc123:0:1000:3 | apiKey=0, uuid=UUID-A, state=0 (STARTED) |
T2 | Upload completes | abc123:0:1000:3 | apiKey=1, uuid=UUID-A, state=1 (FINISHED) |
After Compaction
Audit topic: Retains both messages (T1 STARTED + T2 FINISHED)
Compacted topic: Retains only latest → abc123:0:1000:3 = FINISHED (T1 compacted away)
Cache result: 1 segment with uuid=UUID-A, endOffset=1000, state=FINISHED
Scenario 2: Leadership Change During Upload
Context: Broker 101 (epoch 3) starts upload, leadership changes to Broker 102 (epoch 4), both complete
Timeline & Messages
Time | Event | Key | Value |
|---|---|---|---|
T1 | Broker 101 starts |
| apiKey=0, uuid=UUID-A, state=0, epoch=3 |
T2 | Leadership → 102 | - | - |
T3 | Broker 102 starts | abc123:0:2000:4 | apiKey=0, uuid=UUID-B, state=0, epoch=4 ← Different key! |
T4 | Broker 101 finishes | abc123:0:2000:3 | apiKey=1, uuid=UUID-A, state=1, epoch=3 |
T5 | Broker 102 finishes | abc123:0:2000:4 | apiKey=1, uuid=UUID-B, state=1, epoch=4 |
After Compaction
Audit topic: Retains all 4 messages
Compacted topic: 2 keys, each with latest state
abc123:0:2000:3→ FINISHED (UUID-A, orphaned)abc123:0:2000:4→ FINISHED (UUID-B, active)
Cache result: Returns both segments, selects UUID-B (epoch 4 > 3) for reads
Scenario 3: Failed Upload Under the Same Leader Epoch
Context: Broker 101 (epoch 5) tries upload, fails, retries with new UUID
Timeline & Messages
Time | Event | Key | Value |
|---|---|---|---|
T1 | First attempt | abc123:0:3000:5 | apiKey=0, uuid=UUID-A, state=0 |
T2 | Network timeout | - | (no message) |
T3 | Retry | abc123:0:3000:5 | apiKey=0, uuid=UUID-B, state=0 ← Same key, different UUID! |
T4 | Success | abc123:0:3000:5 | apiKey=1, uuid=UUID-B, state=1 |
After Compaction
Audit topic: Retains all 3 messages (T1 failed, T3 retry, T4 success)
Compacted topic: Retains only latest → abc123:0:3000:5 = FINISHED (UUID-B)
Cache result: Only UUID-B appears (UUID-A compacted away)
Key insight: Same epoch = same key, but new UUID for each attempt
Scenario 4: Segment Deletion
Context: Segment at endOffset=1000 has 3 upload keys (epochs 3,4,5), now being deleted by current leader (epoch 6)
Initial State
abc123:0:1000:3 → FINISHED (UUID-A, orphaned)
abc123:0:1000:4 → FINISHED (UUID-B, orphaned)
abc123:0:1000:5 → FINISHED (UUID-C, active)
Timeline & Messages
Time | Event | Key | Value |
|---|---|---|---|
T1 | Retention triggers | - | (identify segment) |
T2 | Delete from S3 | - | (physical deletion) |
T3 | Mark deletion |
| apiKey=1, state=2 (DELETE_STARTED) |
T4 | Deletion finished | abc123:0:1000:6 | apiKey=1, state=3 (DELETE_FINISHED) |
T5 | Tombstone epoch 3 | abc123:0:1000:3 | null |
T6 | Tombstone epoch 4 | abc123:0:1000:4 | null |
T7 | Tombstone epoch 5 | abc123:0:1000:5 | null |
T8 | Tombstone epoch 6 | abc123:0:1000:6 | null |
T5+24Hour | Message that has key as | - | - |
T6+24Hour | Message that has key as | - | - |
T7+24Hour |
| - | - |
T8+24Hour | Message that has key as | - | - |
Tombstone Details (Compacted Topic ONLY)
After DELETE_FINISHED (T4), tombstones written for ALL 4 keys:
abc123:0:1000:3 → null (epoch 3, UUID-A)
abc123:0:1000:4 → null (epoch 4, UUID-B)
abc123:0:1000:5 → null (epoch 5, UUID-C)
abc123:0:1000:6 → null (epoch 6, deletion marker)
After Compaction
Audit topic: Retains all messages (STARTED, FINISHED, DELETE_STARTED, DELETE_FINISHED)
Compacted topic: All 4 keys removed (segment completely forgotten)
Cache result: Empty (segment no longer exists)
Why tombstone all 4 keys?
Remove all historical metadata (successful + orphaned uploads + deletion marker)
Ensures compacted topic completely "forgets" this segment
Scenario 5: Partition Deletion
Context: Entire partition deleted, cleanup all segments
Timeline & Messages
Time | Event | Key | Value |
|---|---|---|---|
T1 | Start partition delete | abc123:0 | apiKey=2, state=DELETE_PARTITION_STARTED |
T2 | Partition delete done | abc123:0 | apiKey=2, state=DELETE_PARTITION_FINISHED |
Cleanup Process
Compacted topic: All segment keys for abc123:0:*:* get tombstones, partition delete marker remains
Public Interfaces
This KIP will add 2 new fields to the following records to help construct the message’s key published to the compacted topic, one is the logSegment’s endOffset and another one is brokerLeaderEpoch.
A backfill strategy to convert the current message to the new message will be shared in the migration plan part to make sure there is no public visible changes to the Kafka users.
RemoteLogSegmentMetadataRecord.json
[
//...
{
"name": "BrokerId",
"type": "int32",
"versions": "0+"
},
{
"name": "RemoteLogSegmentId",
"type": "RemoteLogSegmentIdEntry",
"versions": "0+"
},
{
"name": "StartOffset",
"type": "int64",
"versions": "0+"
},
{
"name": "EndOffset",
"type": "int64",
"versions": "0+"
},
{
"name": "BrokerId",
"type": "int32",
"versions": "0+"
},
{
"name": "BrokerLeaderEpoch",
"type": "int32",
"versions": "1+",
"about": "The leader epoch of the broker (partition leader epoch) at the time this update is being published.",
"taggedVersions": "1+",
"tag": 0
}
]
RemoteLogSegmentMetadataUpdateRecord.json
[
{
"name": "RemoteLogSegmentId",
"type": "RemoteLogSegmentIdEntry",
"versions": "0+"
},
{
"name": "BrokerId",
"type": "int32",
"versions": "0+"
},
//...
{
"name": "RemoteLogSegmentState",
"type": "int8",
"versions": "0+"
},
{
"name": "BrokerLeaderEpoch",
"type": "int32",
"versions": "1+",
"about": "The leader epoch of the broker (partition leader epoch) at the time this update is being published.",
"taggedVersions": "1+",
"tag": 0
},
{
"name": "EndOffset",
"type": "int64",
"versions": "1+",
"about": "End offset of the segment being updated.",
"taggedVersions": "1+",
"tag": 1
}
]
RemotePartitionDeleteMetadataRecord.json
[
{
"name": "TopicIdPartition",
"type": "TopicIdPartitionEntry",
"versions": "0+"
},
{
"name": "BrokerId",
"type": "int32",
"versions": "0+"
},
{
"name": "EventTimestampMs",
"type": "int64",
"versions": "0+"
},
{
"name": "RemotePartitionDeleteState",
"type": "int8",
"versions": "0+"
},
{
"name": "BrokerLeaderEpoch",
"type": "int32",
"versions": "1+",
"about": "The leader epoch of the broker (partition leader epoch) at the time this update is being published.",
"taggedVersions": "1+",
"tag": 0
},
{
"name": "EndOffset",
"type": "int64",
"versions": "1+",
"about": "End offset of the segment being updated.",
"taggedVersions": "1+",
"tag": 1
}
]
Compatibility, Deprecation, and Migration Plan
For the message compatibility in the remote metadata topic, since the new fields are tagged fields, there will be no forward compatibility issue.
For the feature compatibility wise, the remote.log.metadata.compaction feature is disabled by default and can be enabled by upgrading the feature level using:
kafka-features.sh --bootstrap-server localhost:9092 \ --upgrade remote.log.metadata.compaction=1
Once the feature level is upgraded, downgrading or disabling the feature is not supported. This is a forward-only change and clusters that enable the feature must remain at feature level 1 or higher.
For the clusters that already run in the tiered storage mode, the following migration plan will be executed prior to the broker’s bootstrap finishes to make sure the broker can execute the new code path.
Migration Plan
New clusters
For clusters that do not yet have a __remote_log_metadata topic (i.e., tiered storage is being enabled for the first time on this cluster with this KIP’s implementation):
- When tiered storage is first used, the broker/controller will automatically create __remote_log_metadata with the following configuration:
cleanup.policy=compact min.cleanable.dirty.ratio=0.1 delete.retention.ms=86400000 # 1 day segment.ms=3600000 # 1 hour
Notes:
- cleanup.policy=compact ensures that:
- Only the latest record per key is retained.
- Tombstone records eventually remove fully deleted segments and partitions.
- min.cleanable.dirty.ratio=0.1 allows the cleaner to run once at least 10% of the log is dirty, keeping the topic fairly compact.
- delete.retention.ms=86400000 (1 day) controls how long tombstone markers are retained before they become eligible for physical deletion.
- segment.ms=3600000 (1 hour) bounds the time span of a single segment, helping compaction and cleanup proceed in smaller units.
For new clusters:
- Brokers immediately:
- Produce keyed metadata records using the new schema (including BrokerLeaderEpoch and EndOffset).
- Emit tombstones for segments and partitions when they are deleted.
- No changes are required; the topic is created with the correct configuration by default.
Existing clusters
For clusters that already have tiered storage enabled and an existing __remote_log_metadata topic populated with historical metadata, the migration is incremental and non-disruptive.
Step 1: Ensure finite retention
Before or during the upgrade, operators must ensure that __remote_log_metadata has finite retention, so that very old records can eventually be removed:
- Configure a finite value for retention.ms
This step bounds how far back the audit history can go and ensures that legacy data will age out over time.
Step 2: Rolling upgrade brokers
Perform a standard rolling upgrade of all brokers (and controllers) to the new version that implements this KIP.
As brokers are upgraded and become leaders for partitions:
They start writing keyed
metadata records to __remote_log_metadata, using:
topicId:partition:endOffset:brokerLeaderEpochThey emit tombstones for segments and partitions when:
- A segment reaches DELETE_SEGMENT_FINISHED and is deleted from remote storage.
- A partition reaches DELETE_PARTITION_FINISHED.
During this step:
- Older records (written by previous versions) remain readable and are still used by consumers.
- Tiered storage remains fully functional; no freeze or special coordination is required.
Step 3: Let retention prune legacy data
After the entire cluster is running the new version:
- Continue normal operation and wait for at least one full retention.ms window for __remote_log_metadata.
Over this period:
- Records older than retention.ms are removed according to the configured retention policy.
- The topic gradually becomes dominated by:
- New keyed records (with BrokerLeaderEpoch and EndOffset), and
- Their corresponding tombstones.
At the end of this period, most or all legacy records that do not follow the new keyed/tombstone pattern will have expired.
Step 4: Compaction configuration
For existing clusters, compaction needs to be enabled (or confirmed) on __remote_log_metadata so that the log remains bounded and efficient.
After at least one retention window has passed (Step 3) and operators are comfortable that the topic is mostly populated with new keyed records, they should update the topic configuration to:
cleanup.policy=compact // the following 3 parameters can be configured to the appropriate value as the user needs min.cleanable.dirty.ratio=0.1 delete.retention.ms=86400000 segment.ms=3600000
With this configuration, the log cleaner:
- Compacts multiple updates per key to the latest value.
- Eventually removes tombstoned segments and partitions fully from the log.
- Maintains a stable topic size proportional to the active remote segments and the bounded audit horizon.
Test Plan
Topic Functionality Tests
Confirm all metadata events are properly written to both topics with correct key formats
Validate that the compacted topic maintains only the latest state per segment while the original topic preserves full history
Migration Verification
Migration (Existing Clusters)
- With finite retention.ms, upgrade brokers.
- Confirm new records are keyed and readable.
- Rebuild RemoteLogMetadataCache from __remote_log_metadata and verify:
- All expected remote segments are present.
- Offsets and states are correct.
- After ≥ one retention window and with compaction enabled (if not already), confirm:
- Logical state (live segments, states) is unchanged.
- Topic only contains keyed records and tombstones.
Durability and Recovery Tests
Simulate broker failures during various stages of segment lifecycle (upload, deletion)
Test leader changes during metadata operations to verify correct handling of concurrent operations
Verify metadata consistency after unclean shutdowns and broker restarts
Performance Benchmarks
Measure broker startup time before and after implementation
Compare metadata topic size growth over time between current and new implementation
Rejected Alternatives
Alternative A: Snapshot Approach (KAFKA-19265)
Rejected Reasons:
Distributed Coordination Complexity: Creating consistent snapshots across distributed brokers with different partition leaders is extremely challenging and error-prone.
Leader Transition Reliability: Ensuring new leaders receive the latest snapshot during transitions is difficult to guarantee, especially during network partitions or failures.
Recovery Vulnerabilities: Corrupted or incomplete snapshots would require complex recovery procedures, potentially defeating the purpose of having snapshots.
Operational Overhead: Additional storage requirements, snapshot management create significant operational burden.
Alternative B: Adjust Retention Time Based on Maximum Tiered Storage Topic Retention
Rejected Reasons:
Fails to Address Core Problem: This approach doesn't solve the unbounded metadata growth issue - it merely postpones it by extending retention periods.
Resource Inefficiency and Lack of Granularity: Most metadata records would be retained far longer than necessary, wasting the storage and impacting query and bootstrap performance.
Implementation Complexity: Creating a mechanism to track and dynamically update retention across all tiered storage topics introduces unnecessary complexity.
Risk of Irrecoverable Metadata Loss: If the remote storage module experiences downtime, truncating the
__remote_log_metadatatopic based on retention policies could lead to permanent loss of remote log metadata, even when the remote storage still has those remote log segments, it will cause certain discrepancy between the remote storage and remote logsegment metadata state broker is preserving.