Status

Current state["Under Discussion"]

Discussion thread: here
JIRA: KAFKA-20033 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Background: What is __remote_log_metadata and how is it used?

  • Tiered storage: When local segments close, brokers upload them to remote storage (S3/Azure/GCS).

  • remote_log_metadata: An internal topic used by TopicBasedRemoteLogMetadataManager to record remote-segment lifecycle events per topic-partition (COPY* and DELETE*), including start/end offsets, leader epoch and timestamps.

  • Why it matters: Brokers maintain an in-memory RemoteLogMetadataCache to answer “which segment contains offset X?” and “what is the highest remote offset?” This cache drives remote fetches and cleanup.

  • How state is built today: On startup/leadership changes, brokers consume the relevant metadata partitions from earliest available offset, replay all events, and rebuild the cache (apply STARTED → FINISHED → optional DELETE states; index by endOffset and leaderEpoch). Because the topic is append-only, full replay is required to compute current state.

Operational challenges

Problem 1: Unbounded Growth

  • cleanup.policy=delete with infinite retention (-1)

  • All lifecycle events retained forever

  • ~1.2M metadata records/year at 1 TB/day upload, topic can grow out of bounds, stressing I/O and page cache

Problem 2: No Cleanup Mechanism

  • Metadata persists after remote segments expire

  • No removal for segments already deleted in remote storage

  • Orphaned records from failed/retried uploads accumulate indefinitely

Problem 3: Slow Bootstrap

  • Full replay from offset 0 to rebuild cache

  • Bootstrap time grows with topic size; restarts and leadership changes take increasingly long time

Problem 4: Audit Trail vs Operational Efficiency

  • Design favors complete audit history

  • No option to prefer bounded state and fast bootstrap

  • Cannot trade audit completeness for performance today

This KIP aims to solve this issue through introducing another compacted topic for the brokers to bootstrap the state from, while also preserves the whole remote log metadata audit history through the existing topic __remote_log_metadata.

Goals

  1. Enable bounded metadata storage to support faster bootstrap for the broker restart/leadership changes: the number of messages in the remote metadata topic is proportional to active remote segments, not the total history.

  2. Support metadata cleanup: remove metadata when segments expire from remote storage.

  3. Preserve audit trail in separate topic: keep full lifecycle history per segment (all state transitions).

Non-Goals

  1. Wire-level backward compatibility for keys in the compacted projection (breaking change there is acceptable).

  2. Online migration with zero operational steps: clusters that have the tiered storage enabled before must go through an automated migration to adopt the compacted projection change, clusters that don’t have the tiered storage enabled can adopt the change directly without any migration.

  3. Support for other RLMM implementations is not considered: focus is on TopicBasedRemoteLogMetadataManager.

Proposed Changes

  1. Topics and Configurations
    This KIP updates the behavior and usage of the existing internal topic __remote_log_metadata , the current default configuration for this topic is:

    • cleanup.policy=delete

    • retention.ms=-1 

      With this change, the topic's configuration can be updated to be compacted:

    • cleanup.policy=compact

    • min.cleanable.dirty.ratio=0.1

    • delete.retention.ms=86400000 // 1 day

    • segment.ms=3600000 // 1 hour

      Once the topic becomes compacted, it will only has bounded number of messages for cache bootstrap and steady-state reads; supports tombstones

  2. Message Key and Value
    All metadata records in __remote_log_metadata become keyed. The value carries the detailed metadata; the key defines the uniqueness and compaction behavior.

    1. Key format: TopicIdPartition:EndOffset:BrokerLeaderEpoch

    2. Example: "T8fJ9Kz3RyWxP2mQ4nL7vB:0:1999:5"

    3. Components:

      • TopicIdPartition: unique UUID to represent which topic and which partition

      • endOffset: last offset in the segment (deterministic since tiered storage only uploads closed segment)

      • brokerLeaderEpoch: the leader epoch of the broker that owned the topic-partition during the upload. It disambiguates concurrent/retry cases. Since a single broker attempts to upload a given segment only once at a time, only one writer will try to write the value anytime.

    4. Properties:

      • For the same segment under the same leader epoch, retries or updates share the same key (compaction keeps the latest state).

      • For the same segment under different leader epochs, records use different keys, allowing the system to distinguish concurrent or overlapping uploads across different brokers.

      For partition-level delete metadata, the same key structure is used, with an endOffset that represents the partition end offset at deletion time.

  3. Broker Write Path
    The broker continues to write lifecycle events for remote log segments and partitions, but now with:

    • Deterministic keys.

    • Tombstone semantics once the log segment is expired.

           Lifecycle events

    • Segment upload:

      • COPY_SEGMENT_STARTED

      • COPY_SEGMENT_FINISHED

    • Segment deletion:

      • DELETE_SEGMENT_STARTED

      • DELETE_SEGMENT_FINISHED

    • Partition deletion:

      • DELETE_PARTITION_STARTED

      • DELETE_PARTITION_FINISHED

It will also receive the tombstone messages for those expired metadata messages, expired metadata messages are defined as the messages that belong to the log segments that are already in the DELETE_SEGMENT_FINISHED states. 

A series of tombstone messages will be published for all the keys that share the same prefix topicIdPartition:endOffset  and keys that have no bigger brokerLeaderEpoch than the current one. While the original storeRemoteLogMetadata() method will wait for the consumer of __remote_log_metadata  to finish the consumption of the latest DELETE_SEGMENT_FINISHED message, in order to minimize the impact for the performance, for the tombstone messages, the consumers will finish the consumption asynchronously. And also, to guarantee the expired messages are tomb-stoned, a background cleanup thread executed by RemoteMetadataCleanupManager    will scan the __remote_log_metadata  periodically to check whether there are orphaned messages that can be deleted.

4. Broker Read Path

There is no change in how the ConsumerTask read the messages and build the state in TopicBasedRemoteLogMetadataManager  since there is not much meaningful value format change.

There is no change to RemoteLogMetadataTopicPartitioner  neither.

User Scenario

RemoteLogSegmentState Enum Values

Value

State

Description

0

COPY_SEGMENT_STARTED

Upload in progress

1

COPY_SEGMENT_FINISHED

Upload completed successfully

2

DELETE_SEGMENT_STARTED

Deletion in progress

3

DELETE_SEGMENT_FINISHED

Deletion completed

Scenario 1: Normal Segment Upload

Context: Topic orders   (ID:  abc123 )

Partition 0, Broker 101 at epoch 3

Timeline & Messages

Time

Event

Message Key in the Compacted Topic

Value

T1

Start upload

abc123:0:1000:3 

apiKey=0, uuid=UUID-A, state=0 (STARTED)

T2

Upload completes

abc123:0:1000:3 

apiKey=1, uuid=UUID-A, state=1 (FINISHED)

After Compaction

Audit topic: Retains both messages (T1 STARTED + T2 FINISHED)

Compacted topic: Retains only latest → abc123:0:1000:3  = FINISHED (T1 compacted away)

Cache result: 1 segment with uuid=UUID-A, endOffset=1000, state=FINISHED

Scenario 2: Leadership Change During Upload

Context: Broker 101 (epoch 3) starts upload, leadership changes to Broker 102 (epoch 4), both complete

Timeline & Messages

Time

Event

Key

Value

T1

Broker 101 starts

abc123:0:2000:3 

apiKey=0, uuid=UUID-A, state=0, epoch=3

T2

Leadership → 102

-

-

T3

Broker 102 starts

abc123:0:2000:4 

apiKey=0, uuid=UUID-B, state=0, epoch=4 ← Different key!

T4

Broker 101 finishes

abc123:0:2000:3 

apiKey=1, uuid=UUID-A, state=1, epoch=3

T5

Broker 102 finishes

abc123:0:2000:4 

apiKey=1, uuid=UUID-B, state=1, epoch=4

After Compaction

Audit topic: Retains all 4 messages

Compacted topic: 2 keys, each with latest state

  • abc123:0:2000:3 → FINISHED (UUID-A, orphaned)

  •  abc123:0:2000:4 → FINISHED (UUID-B, active)

Cache result: Returns both segments, selects UUID-B (epoch 4 > 3) for reads


Scenario 3: Failed Upload Under the Same Leader Epoch

Context: Broker 101 (epoch 5) tries upload, fails, retries with new UUID

Timeline & Messages

Time

Event

Key

Value

T1

First attempt

abc123:0:3000:5 

apiKey=0, uuid=UUID-A, state=0

T2

Network timeout

-

(no message)

T3

Retry

abc123:0:3000:5 

apiKey=0, uuid=UUID-B, state=0 ← Same key, different UUID!

T4

Success

abc123:0:3000:5

apiKey=1, uuid=UUID-B, state=1

After Compaction

Audit topic: Retains all 3 messages (T1 failed, T3 retry, T4 success)

Compacted topic: Retains only latest → abc123:0:3000:5  = FINISHED (UUID-B)

Cache result: Only UUID-B appears (UUID-A compacted away)

Key insight: Same epoch = same key, but new UUID for each attempt


Scenario 4: Segment Deletion

Context: Segment at endOffset=1000 has 3 upload keys (epochs 3,4,5), now being deleted by current leader (epoch 6)

Initial State

abc123:0:1000:3 → FINISHED (UUID-A, orphaned)

abc123:0:1000:4 → FINISHED (UUID-B, orphaned)

abc123:0:1000:5 → FINISHED (UUID-C, active)

Timeline & Messages

Time

Event

Key

Value

T1

Retention triggers

-

(identify segment)

T2

Delete from S3

-

(physical deletion)

T3

Mark deletion

abc123:0:1000:6 

apiKey=1, state=2 (DELETE_STARTED)

T4

Deletion finished

abc123:0:1000:6

apiKey=1, state=3 (DELETE_FINISHED)

T5

Tombstone epoch 3

abc123:0:1000:3

null

T6

Tombstone epoch 4

abc123:0:1000:4

null

T7

Tombstone epoch 5

abc123:0:1000:5

null

T8

Tombstone epoch 6

abc123:0:1000:6

null

T5+24Hour

Message that has key as abc123:0:1000:3 deleted

-

-

T6+24Hour

Message that has key as
abc123:0:1000:4 deleted

-

-

T7+24Hour

Message that has key as
abc123:0:1000:5 deleted

-

-

T8+24Hour

Message that has key as
abc123:0:1000:6 deleted

-

-

Tombstone Details (Compacted Topic ONLY)

After DELETE_FINISHED (T4), tombstones written for ALL 4 keys:

abc123:0:1000:3 → null (epoch 3, UUID-A)

abc123:0:1000:4 → null (epoch 4, UUID-B)

abc123:0:1000:5 → null (epoch 5, UUID-C)

abc123:0:1000:6 → null (epoch 6, deletion marker)

After Compaction

Audit topic: Retains all messages (STARTED, FINISHED, DELETE_STARTED, DELETE_FINISHED)

Compacted topic: All 4 keys removed (segment completely forgotten)

Cache result: Empty (segment no longer exists)

Why tombstone all 4 keys?

  • Remove all historical metadata (successful + orphaned uploads + deletion marker)

  • Ensures compacted topic completely "forgets" this segment


Scenario 5: Partition Deletion

Context: Entire partition deleted, cleanup all segments

Timeline & Messages

Time

Event

Key

Value

T1

Start partition delete

abc123:0 

apiKey=2, state=DELETE_PARTITION_STARTED

T2

Partition delete done

abc123:0 

apiKey=2, state=DELETE_PARTITION_FINISHED

Cleanup Process

Compacted topic: All segment keys for abc123:0:*:*  get tombstones, partition delete marker remains


Public Interfaces

This KIP will add 2 new fields to the following records to help construct the message’s key published to the compacted topic, one is the logSegment’s endOffset and another one is brokerLeaderEpoch.

A backfill strategy to convert the current message to the new message will be shared in the migration plan part to make sure there is no public visible changes to the Kafka users.

RemoteLogSegmentMetadataRecord.json

[
//...
  {
    "name": "BrokerId",
    "type": "int32",
    "versions": "0+"
  },
  {
    "name": "RemoteLogSegmentId",
    "type": "RemoteLogSegmentIdEntry",
    "versions": "0+"
  },
  {
    "name": "StartOffset",
    "type": "int64",
    "versions": "0+"
  },
  {
    "name": "EndOffset",
    "type": "int64",
    "versions": "0+"
  },
  {
    "name": "BrokerId",
    "type": "int32",
    "versions": "0+"
  },
    {
      "name": "BrokerLeaderEpoch",
      "type": "int32",
      "versions": "1+",
      "about": "The leader epoch of the broker (partition leader epoch) at the time this update is being published.",
      "taggedVersions": "1+",
      "tag": 0
    }
]


RemoteLogSegmentMetadataUpdateRecord.json

[
  {
    "name": "RemoteLogSegmentId",
    "type": "RemoteLogSegmentIdEntry",
    "versions": "0+"
  },
  {
    "name": "BrokerId",
    "type": "int32",
    "versions": "0+"
  },
  //...
  {
    "name": "RemoteLogSegmentState",
    "type": "int8",
    "versions": "0+"
  },
    {
      "name": "BrokerLeaderEpoch",
      "type": "int32",
      "versions": "1+",
      "about": "The leader epoch of the broker (partition leader epoch) at the time this update is being published.",
      "taggedVersions": "1+",
      "tag": 0
    },
    {
      "name": "EndOffset",
      "type": "int64",
      "versions": "1+",
      "about": "End offset of the segment being updated.",
      "taggedVersions": "1+",
      "tag": 1
    }
]


RemotePartitionDeleteMetadataRecord.json

[
  {
    "name": "TopicIdPartition",
    "type": "TopicIdPartitionEntry",
    "versions": "0+"
  },
  {
    "name": "BrokerId",
    "type": "int32",
    "versions": "0+"
  },
  {
    "name": "EventTimestampMs",
    "type": "int64",
    "versions": "0+"
  },
  {
    "name": "RemotePartitionDeleteState",
    "type": "int8",
    "versions": "0+"
  },
    {
      "name": "BrokerLeaderEpoch",
      "type": "int32",
      "versions": "1+",
      "about": "The leader epoch of the broker (partition leader epoch) at the time this update is being published.",
      "taggedVersions": "1+",
      "tag": 0
    },
    {
      "name": "EndOffset",
      "type": "int64",
      "versions": "1+",
      "about": "End offset of the segment being updated.",
      "taggedVersions": "1+",
      "tag": 1
    }
]

Compatibility, Deprecation, and Migration Plan

For the message compatibility in the remote metadata topic, since the new fields are tagged fields, there will be no forward compatibility issue.

For the feature compatibility wise, the remote.log.metadata.compaction feature is disabled by default and can be enabled by upgrading the feature level using:

kafka-features.sh --bootstrap-server localhost:9092 \
  --upgrade remote.log.metadata.compaction=1

Once the feature level is upgraded, downgrading or disabling the feature is not supported. This is a forward-only change and clusters that enable the feature must remain at feature level 1 or higher.

For the clusters that already run in the tiered storage mode, the following migration plan will be executed prior to the broker’s bootstrap finishes to make sure the broker can execute the new code path.

Migration Plan

New clusters

For clusters that do not yet have a __remote_log_metadata topic (i.e., tiered storage is being enabled for the first time on this cluster with this KIP’s implementation):

  • When tiered storage is first used, the broker/controller will automatically create __remote_log_metadata with the following configuration:
cleanup.policy=compact
min.cleanable.dirty.ratio=0.1
delete.retention.ms=86400000    # 1 day
segment.ms=3600000              # 1 hour



Notes:

  • cleanup.policy=compact ensures that:
    • Only the latest record per key is retained.
    • Tombstone records eventually remove fully deleted segments and partitions.
  • min.cleanable.dirty.ratio=0.1 allows the cleaner to run once at least 10% of the log is dirty, keeping the topic fairly compact.
  • delete.retention.ms=86400000 (1 day) controls how long tombstone markers are retained before they become eligible for physical deletion.
  • segment.ms=3600000 (1 hour) bounds the time span of a single segment, helping compaction and cleanup proceed in smaller units.

For new clusters:

  • Brokers immediately:
    • Produce keyed metadata records using the new schema (including BrokerLeaderEpoch and EndOffset).
    • Emit tombstones for segments and partitions when they are deleted.
  • No changes are required; the topic is created with the correct configuration by default.

Existing clusters

For clusters that already have tiered storage enabled and an existing  __remote_log_metadata topic populated with historical metadata, the migration is incremental and non-disruptive.

Step 1: Ensure finite retention

Before or during the upgrade, operators must ensure that __remote_log_metadata has finite retention, so that very old records can eventually be removed:

  • Configure a finite value for retention.ms

This step bounds how far back the audit history can go and ensures that legacy data will age out over time.

Step 2: Rolling upgrade brokers

Perform a standard rolling upgrade of all brokers (and controllers) to the new version that implements this KIP.

As brokers are upgraded and become leaders for partitions:

  • They start writing keyed

  • metadata records to __remote_log_metadata, using:


    topicId:partition:endOffset:brokerLeaderEpoch
    
  • They emit tombstones for segments and partitions when:

    • A segment reaches DELETE_SEGMENT_FINISHED and is deleted from remote storage.
    • A partition reaches DELETE_PARTITION_FINISHED.

During this step:

  • Older records (written by previous versions) remain readable and are still used by consumers.
  • Tiered storage remains fully functional; no freeze or special coordination is required.

Step 3: Let retention prune legacy data

After the entire cluster is running the new version:

  • Continue normal operation and wait for at least one full retention.ms window for __remote_log_metadata.

Over this period:

  • Records older than retention.ms are removed according to the configured retention policy.
  • The topic gradually becomes dominated by:
    • New keyed records (with BrokerLeaderEpoch and EndOffset), and
    • Their corresponding tombstones.

At the end of this period, most or all legacy records that do not follow the new keyed/tombstone pattern will have expired.

Step 4: Compaction configuration

For existing clusters, compaction needs to be enabled (or confirmed) on __remote_log_metadata so that the log remains bounded and efficient.

  • After at least one retention window has passed (Step 3) and operators are comfortable that the topic is mostly populated with new keyed records, they should update the topic configuration to:

cleanup.policy=compact

// the following 3 parameters can be configured to the appropriate value as the user needs
min.cleanable.dirty.ratio=0.1
delete.retention.ms=86400000
segment.ms=3600000 


With this configuration, the log cleaner:

    • Compacts multiple updates per key to the latest value.
    • Eventually removes tombstoned segments and partitions fully from the log.
    • Maintains a stable topic size proportional to the active remote segments and the bounded audit horizon.

Test Plan

Topic Functionality Tests

  • Confirm all metadata events are properly written to both topics with correct key formats

  • Validate that the compacted topic maintains only the latest state per segment while the original topic preserves full history

Migration Verification

  • Migration (Existing Clusters)
    • With finite retention.ms, upgrade brokers.
    • Confirm new records are keyed and readable.
    • Rebuild RemoteLogMetadataCache from __remote_log_metadata and verify:
      • All expected remote segments are present.
      • Offsets and states are correct.
    • After ≥ one retention window and with compaction enabled (if not already), confirm:
      • Logical state (live segments, states) is unchanged.
      • Topic only contains keyed records and tombstones.

Durability and Recovery Tests

  • Simulate broker failures during various stages of segment lifecycle (upload, deletion)

  • Test leader changes during metadata operations to verify correct handling of concurrent operations

  • Verify metadata consistency after unclean shutdowns and broker restarts

Performance Benchmarks

  • Measure broker startup time before and after implementation

  • Compare metadata topic size growth over time between current and new implementation

Rejected Alternatives

Alternative A: Snapshot Approach (KAFKA-19265)

Rejected Reasons:

  1. Distributed Coordination Complexity: Creating consistent snapshots across distributed brokers with different partition leaders is extremely challenging and error-prone.

  2. Leader Transition Reliability: Ensuring new leaders receive the latest snapshot during transitions is difficult to guarantee, especially during network partitions or failures.

  3. Recovery Vulnerabilities: Corrupted or incomplete snapshots would require complex recovery procedures, potentially defeating the purpose of having snapshots.

  4. Operational Overhead: Additional storage requirements, snapshot management create significant operational burden.

Alternative B: Adjust Retention Time Based on Maximum Tiered Storage Topic Retention

Rejected Reasons:

  1. Fails to Address Core Problem: This approach doesn't solve the unbounded metadata growth issue - it merely postpones it by extending retention periods.

  2. Resource Inefficiency and Lack of Granularity: Most metadata records would be retained far longer than necessary, wasting the storage and impacting query and bootstrap performance.

  3. Implementation Complexity: Creating a mechanism to track and dynamically update retention across all tiered storage topics introduces unnecessary complexity.

  4. Risk of Irrecoverable Metadata Loss: If the remote storage module experiences downtime, truncating the __remote_log_metadata  topic based on retention policies could lead to permanent loss of remote log metadata, even when the remote storage still has those remote log segments, it will cause certain discrepancy between the remote storage and remote logsegment metadata state broker is preserving.



  • No labels