DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Authors: Henry Cai, Thomas Thornton, Greg Harris
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-405 introduced remote tiered storage for Kafka for regular append-only logs. The support for compacted topics (log representing compacted K/V storage) is deferred for future KIPs. Here we propose a design to support compacted topics in tiered storage.
Public Interfaces
Configurations: This feature enables the combination of remote.storage.enable=true with cleanup.policy=compact. Previously, this combination would cause an exception to be thrown.
Interfaces: No changes to the client protocol or binary log format. No changes to existing CLI tools.
Background
First we start with the background information on how current log compaction works and the lifecycle of the log segment files. In the next section we will describe how we enhance some of the steps.
Lifecycle for append-only logs in tiered storage
We briefly describes the life cycles for append-only logs in tiered storage:
Creation of remote log segment file and index files
- When the current active local log segment file is full and LogManager decides to roll out a new log segment file, the log rotation event occurs.
- The existing active log segment file (and its accompanying index files) will be flushed and closed.
- RemoteLogManager will publish a RemoteLogMetadata event to RemoteLogMetadataManager
- RemoteLogManager will invoke RemoteStorageManager.copyLogSegmentData() to upload the local log segment file onto remote object storage;
- RemoteLogManager will repeat the same sequences to upload related index files (OffsetIndex, TimeIndex, TransactionIndex files) onto remote object storage;
- A new local log segment file (and its accompanying index files) will be created on the local disk.
Deletion of local log segment file and indexes on local.retention.ms/bytes trigger
LogRetention thread will periodically delete the log segment files (and their accompanying index files) whose age passes local.retention.ms/bytes
Deletion of remote log segment file and indexes on total retention.ms trigger
- LogRetention thread will periodically delete the log segment files (and their accompany index files) whose age passes the overall retention.ms on remote object storage through RemoteStorageManager.deleteLogSegmentData()
- The retention.ms value is larger than local.retention.ms
How log compaction works for local topics
This Confluent wiki has details on how log compaction works. In summary, this is the high level steps:
- LogCleaner thread runs periodically in the background;
- At each run interval, the cleaner threads finds dirty log segments and clean them
- The `dirty` ratio is calculated by cleanableBytes / totalBytes where cleanableBytes is the range from firstDirtyOffset (the offset checkpoint from last cleaner run + 1) to uncleanableOffset (e.g. LSO boundary)
- The logs whose dirty ratio larger than min.cleanable.dirty.ratio will be cleaned during this run
- When the log needs to be cleaned, the cleaner will do 2 passes on the log segment file:
- The first pass is to build an in-memory hashmap from message key to the last message offset for the messages having the same key
- The second pass is to read the messages in sequential order and skip all messages whose offset is less than the in-memory hashmap value
- The output of the second pass will be written to a new log segment file with the file suffix ‘.cleaned’
- The index file (OffsetIndex, TimeIndex, TransactionIndex) will also be generated at the same time
- Swap Log segment files
- At the end of cleaning, a swap of log segment file (and its accompanying index files) will occur:
- The new log segment file will be renamed to ‘.swap’
- The old log segment file will be deleted by renaming file to ‘.deleted’ and asynchronous delete the ‘.deleted’ file separately;
- The ‘.swap’ file will then be renamed to original log segment file name;
- If the swap fails in the middle so there will be left over ‘.swap’ files on the disk, the LogLoader process will recover those ‘.swap’ files.
- Processing multiple Log segment files in a group
- In order to make sure the new log segment file is not shrinked too small, LogCleaner groups several log segment files in a group and process them together and only generate one new log segment file (and accompanying index file)
- For example, if the old log segments are called 0001.log (all messages having offsets >= 1), 1000.log (all messages having offsets >= 1000). LogCleaner can group them together to do the cleaning, the output is just one file 0001.log
- Message Deletion
- If there are tombstone records (value=null) in the file, the LogCleaner can remove the messages with same key during cleaning;
- Handling transactional messages and idempotent producer
- In order to maintain sequence number continuity for active producers, we always retain the last batch from each producerId, even if all the records from the batch have been removed. The batch will be removed once it is expired due to inactivity or the producer either writes a new batch.
- We do not clean beyond the last stable offset. This ensures that all records observed by the cleaner have been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to collect the aborted transactions ahead of time.
- Records from aborted transactions are removed by the cleaner immediately without regard to record keys.
- Transaction markers are retained until all record batches from the same transaction have been removed and a sufficient amount of time has passed.
Proposed Changes
High-Level Design
Log compaction on local disks is a well-established process. However, supporting it in tiered storage introduces complexity regarding remote storage, transaction handling and multi-segment processing.
To ensure stability and correctness, we opt for a design that maintains the existing log cleaning logic with minimal changes. The LogCleaner will:
- Download the remote log segment to the local broker before cleaning starts
- Clean the segment using the existing local cleaning logic
- Upload the newly compacted segment back to remote storage
This approach avoids implementing complex logic specific to tiered storage, and uses already well-tested local compaction code.
There will be minimal changes to LogCleaner code except we will download the remote log segment to local before the cleaning starts and upload the cleaned log segment after the cleaning is done.
The above diagram shows the local/remote log segments before and after the log compaction.
For each log for a topic partition, from the perspective of log compaction we can look at the log as composed of 5 contiguous regions:
- Cleaned: the log segments were cleaned in previous log cleaning cycles; the checkpoint file remember the boundary between cleaned and dirty section;
- Dirty: the dirty log segments which are ready to be cleaned
- Uncleanable: the log segments which are dirty but not ready to be cleaned, this is usually caused by min.compaction.lag.ms is not due yet;
- Unstable: the messages before latest stable offset (LSO) are not yet stable for read-committed consumer to read;
- Active: the active log segment which is not ready to be uploaded to remote storage;
Before compaction, log segments (A/B/C) are already uploaded to remote storage; Some log segments (B2/C/D/E) are at the moment stored in the local filesystem. Note a portion of the dirty region (B2) is still at the local storage.
During compaction, we first build an in-memory index map with key as the message key and the value as the last offset location of the message; And then we rewrite the log segments for all A+B using this map, for the messages whose key is present in the map it will only be written out if its offset matches with the offset value in the map. Note we only need to build the index map for the messages in region B but the log rewriting needs to happen for regions A and B.
After compaction, both region A and B are cleaned with no duplicate message keys and checkpoint location is moved to the end of B. Note the log segments of A,B after compaction might have different content than the ones before compaction since some messages might be compacted out.
For the log segments which still have a local presence (e.g. B2), we don’t have to fetch remote log segments. For the remote log segments which are being changed, a remote file delete followed by a remote file create/upload will occur, those will be detailed in a later section. There is also optimization of not downloading remote log segments if its content has no interaction with the index map.
Change on Local Segment Retention/Deletion for Compacted Topics
For the current compacted topic setup with cleanup.policy=compact, Kafka never deletes local log segments based on time (only compaction reduces size). If we maintained this behavior, tiered storage would offer no benefit since all log segments are present on the local filesystem and there is no need to look up data in remote storage.
To make tiered storage usable in this setup, we will also perform time-based local retention for tiered compacted topics:
cleanup.policy=compact,delete
Local segments are deleted when they exceed local.retention.ms/bytes, same as append-only logs. The remote compacted segments persist according to the overall retention.ms
cleanup.policy=compact
Local segments are deleted when they exceed local.retention.ms/bytes, regardless of compaction status. This means:
- Local storage contains only recent segments (compacted or not)
- Remote storage accumulates all segment history
- Compaction happens opportunistically when segments are still available locally
- If compaction is delayed beyond local retention, segments are downloaded from remote for compaction.
Local deletion is time-based only and does not wait for compaction to complete. This ensures
- Local disk management is predictable and not blocked by the async compaction process
- No durability risk (we never block deletion on an async background task)
- Compaction and local retention operate independently
We make this trade-off intentionally to ensure durability is not compromised. This may increase downloads if local retention is consistently shorter than compaction. However, this is acceptable since compaction inherently requires re-processing old segments from remote storage (see later sections in Detailed Design on how we optimize this).
Detailed Design
We will discuss some of the detailed design choices. First we will walk through the lifecycle for compacted topics in tiered storage and compare the flow with the local topic compaction.
Lifecycle for compacted log segments in tiered storage
When a compacted topic is enabled for remote tiered storage:
Creation of remote log segment file and index files
When the current active local log segment file is full and LogManagerrolls to a new log segment file, the log rotation event occurs. LogManager will close the log segment file and its index files and upload them to remote tiered storage through RemoteStorageManager.
The process is the same as the handling for append-only logs.
Cleaning of Log Segment Files
The LogCleaner thread runs at regular intervals:
Find the Dirty Logs to Clean
- Calculate dirty ratio as cleanableBytes / (localBytes + remoteBytes) for tiered topics
- Find logs whose dirty ratio exceeds min.cleanable.dirty.ratio
- The calculation uses:
- Local segment metadata from UnifiedLog object in memory
- Remote segment sizes cached in LogCleanerManager (retrieved from RemoteLogSegmentMetadata)
- Cleaner-checkpoint file
- No expensive remote storage access is needed (only metadata query)
Group The Log Segments
For each logs to clean, group the log segmentsThis logic is the same as existing code
Chunked Two Pass Compaction
To prevent disk exhaustion when downloading multiple remote segments for compaction, we process the work in bounded chunks.
Chunk size is defined as min(log.segment.bytes, available_disk_space / 3). This ensures there is room for downloaded segments, offset map, and output segments.
Algorithm Per Chunk
- Select next N segments (local or remote) fitting within chunk size
- Fetch remote segments that are not present locally
- If the log segment only lives on remote tiered storage, fetch the file content from remote tiered storage through RemoteStorageManger.fetchLogSegment(remoteLogSegmentMetadata, startPosition, endPosition)
- remoteLogSegmentMetadata, startPosition, endPosition can be resolved through the caches in RemoteLogManager given the cleaning offset
- Store the downloaded content into a local temporary file since we are going to do a second-pass reading shortly afterwards
- If the log segment is present locally on the broker, we skip the remote fetch steps, and can simply fallback to the existing logic with the local segment
- Build offset map from these segments
- Perform second pass reading and processing/skipping message
- This logic is the same as existing code
- The second pass uses the previously built in-memory hashmap and reads from the local log segment file or from the InputStream from RemoteStorageManager.fetchLogSegment()
- If the log segment is present locally on the broker, it uses that file in the second pass
- Note during the second pass we need to reprocess all previous cleaned log segment files from the start of the log, most of those files will only be available in remote tiered storage. There is an optimization using bloom filter (mentioned below) to avoid processing those remote log segment files if the keys don’t overlap with the offset map.
- Delete temporary downloaded files
- Move on to the next chunk
- This iteration order makes sure we will only use limited disk space to process the segment files in this group;
- This is a little different than the existing code where the first pass reads out all log segment files in the cleanable range to build one offset map;
Bloom Filter Index on Skip Optimization
To avoid downloading remote segments that contain no keys in the current offset map, we will build a bloom filter index per remote log segment.
- Lifecycle
- Lazy initialization on first download of a remote segment for compaction
- Invalidated and rebuilt when segment is re-compacted
- Configurable false positive rate (default 1%), sized based on estimate key cardinality from segment size / avg record size
- Before downloading a remote segment for the second pass, check if any key in the current offset map exists in the segment’s bloom filter
- If no keys match, skip download and copy segment metadata forward
- If any key matches, download and compact normally
- Small storage overhead (~1-2% of segment size). Avoids downloads for segments with no dirty keys
Swap Old/New Log Segment Files
On the local file filesystem, the swap stage consists of several stages:
- Rename the new .cleaned file into .swap file
- Rename the old log segment file to .deleted
- Rename .swap file to the old log segment file
- Asynchronously delete the .deleted file after a delay
On the remote storage, the swap is relatively simpler since we don’t directly access remote log segment files. Instead, we query RemoteLogMetadataManager for segment locations based on offset and leader epoch. The swap sequence is:
- Upload the newly compacted segment to remote storage with a new RemoteLogSegmentId
- Publish RemoteLogSegmentMetadata with updated cleanerOffsets and leader epoch information
- RemoteLogMetadataManager updates its cache to route reads to the new segment
- The old segment becomes unreferenced (equivalent to .deleted state in local storage)
- Unreferenced segments are asynchronously deleted by the retention thread after validation (similar to local storage as well)
As long as we include the correct leader epoch and offset information in the RLMetadata when we upload the newly rewritten log segment file, the RemoteLogMetadataManager will update its cache to route the retrieval request to the latest segment file; the old log segment file essentially becomes an orphan node.
Although we could delete the old log segment file immediately after uploading the new segment, we defer deletion to the log retention thread. This ensures deletions are validated against the current leader epoch lineage, preventing zombie brokers from deleting segments that are still valid for the new leader. Orphaned segments from invalid compactions are filtered out by the same epoch validation mechanism used for checkpoint fencing (See the later discussion on fencing old broker on cleaner-offset-checkpoint file).
Processing multiple Log segment files in a group
Same as existing code
- Note in this case we will read multiple log segments (some of them lives only on remote tiered storage)
- Note in this case we are removing multiple old log segment files (some of them lives only on remote tiered storage) during swap stage
Message Deletion
Same as existing code. Tombstone records (value=null) are processed during compaction.
Handling transactional messages and idempotent producer
Same as existing code. All existing transaction handling logic is preserved
Errors/Crashes during log compaction and later file swapping
- When there are errors during log compaction, the operation will be aborted and retried later, this is the existing behavior;
- If the server crashes during the file swapping stage, we will have the new log segment file created in ‘.swap’ file suffix. LogLoader during the next server startup will find those swap files and finish the renaming process. This is the existing behavior. In the tiered storage setup, we will do cascade deletion and creation in remote tiered storage if the file also exists in remote tiered storage.
Design Choices
We will then discuss some of the design considerations.
Dirty Ratio Calculation
For tiered storage topics, we will modify LogToClean to include remote segment sizes in the totalBytes calculation.
To method to determine the sum of remote segment sizes will:
- Query RemoteLogSegmentMetadata for all remote segments of the partition
- Sum their segmentSizeInBytes values
- Cache this sum in LogCleanerManager to avoid repeated remote metadata lookups
- Update the cache incrementally as segments are uploaded/deleted
Compaction on the follower broker
For the classic topic, the log cleaning/compaction is happening independently on the leader and the follower broker at the same time. If we use the same approach for the tiered storage topic, the follower broker will need to download all the old log segments from remote storage on each cleaning cycle which is not desirable. Instead the follower broker will only do log cleaning/compaction on its local log segment files.
When the leadership switches to the follower broker, the follower needs to adjust its cleaner offset checkpoint to reflect the position from the old leader since this checkpoint marks all the messages before the mark has been cleaned. For this reason, the leader broker will need to persist its cleaner-offset-checkpoint on both local disk as well as remote tiered storage so this state can be transitioned to the follower on the leadership switch event.
Persisting cleaner-offset-checkpoint to remote storage and Fencing off zombie broker
When the leadership switches from broker A to broker B, broker B needs to figure out the current state of log compaction on the remote storage and what is the cleaner-offset-checkpoint on the overall log for the given topic partition. For this reason, broker A needs to persist its cleaner-offset-checkpoint to remote storage at the end of each compaction cycle. However we cannot simply upload the checkpoint file directly onto remote storage since broker A might lose the leadership by force because it became an unresponsive zombie. After the controller moves the leadership to broker B and while broker B is working on log compaction and uploading its new cleaner-offset-checkpoint file, broker A can become live again and try to finish uploading the old checkpoint file and thus overwrite the new checkpoint file.
To fence off the rogue leader, we will extend the concept of cleaner-offset-checkpoint in remote storage to be a vector of entries with each entry of tuple (leader-epoch, cleaner-offset-checkpoint-for-the-epoch). Those entries established the lineage of cleaner-offset-checkpoint with regards to leader epoch change. This design models the leader-epoch checkpoint file introduced in KIP-101.
The vector of cleaner-offset-checkpoint in remote storage will be persisted as part of the RemoteLogSegmentMetadata (very similar to how we include segmentLeaderEpochs in the LogSegmentMetadata):
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
...
Map<Integer, Long> segmentLeaderEpochs,
Map<Integer, Long> cleanerOffsets) {
The leader broker will publish out this list of cleanOffsets for each log segment it is uploading to remote storage. The value of the cleaner checkpoint corresponds to each compaction cycle and increases as the compaction cycle continues.
The RemoteStorageMetadataManager will have a cache of leaderEpoch/cleanerOffset as it tracks the RemoteLogMetadata publishing, much like how it builds the cache of leaderEpoch/highestOffset. The cache tracks the max valid cleaner offset for each leader epoch. Each uploaded segment contains the current cleanerOffset value for all leader epochs present in that segment, so the latest segment provides the checkpoint values (without needing to scan across multiple segments).
When the follower broker becomes the leader it will need to find the current cleaner-offset-checkpoint on the remote storage. It will look at the previous leaderEpoch and ask the RemoteStorageMetadataManager for the cleanerOffset for that epoch, this is very similar to how it currently resolves the highest offset for a leader epoch during leadership switch.
The follower also needs to remove its local log segments before the cleaner-offset-checkpoint to make its local content consistent with the remote storage.
When the follower proceeds with compaction and publishes its new cleaner-offset it will use its new leader epoch and essentially makes any publishing from zombie old broker invalid.
The following table shows the content of remote log metadata when the leadership switches from broker A to broker B. In the table, CO stands for cleaner-offset, LE stands for leader-epoch (broker A has LE-0 while broker B has LE-1), the cleaner-offset-map content of {LE-0 -> CO-100, LE-1 -> CO-155} means cleaner-offset for leader-epoch-0 ends at offset-100, cleaner-offset for leader-epoch-1 ends at offset-155.
Event | Broker A (Old Leader) | Broker B (New Leader) | RL Metadata | Note |
A finishes one compaction | Initial cleaner offset map: {LE-0:} Publish Seg-0, CO at 100 | Seg-0: {LE-0->CO-100} | ||
A becomes unresponsive and B becomes leader | Resolve cleaner-offset map as: {LE-0 -> CO-100, LE-1 ->} | |||
B finishes one compaction | Upload seg-2 and CO at 155 at the moment | Seg-2: {LE-0->CO-100, LE-1->CO-155} | ||
A becomes live and upload its pending compaction | Upload seg-1 and CO at 123 | Seg-1: {LE-0->CO-123} | Seg-1 & Seg-2 can arrive in any order. Regardless of arrival order, Seg-2’s cleaner offset map (LE-0-> 100, LE-1-> 155) establishes that LE-0 cleaned up to 100, and LE-1 cleaned up to 155. This means LE-0’s valid range ended at offset 100, invalidating Seg-1’s claim that LE-0 reached offset 123. The cache validation rejects Seg-1 regardless of arrival order. | |
Future reads from RemoteStorage | Will not read Seg-2: {LE-0->CO-100, LE-1->CO-155} |
Similar Fencing protection also needs to happen to the remote log segment upload from the old broker as well as the deletion of the old remote log segment. For remote log segment deletion, we will need to perform a tombstone operation instead of outright file deletion since that deletion request can come from a zombie broker A (while active broker B still needs to read the data from that log segment). So the metadata manager will mark the remote log segment as to be deleted (RemoteLogState.DELETE_SEGMENT_STARTED), then validates them using isRemoteSegmentWithinLeaderEpochs() before actual deletion. This prevents zombie brokers from deleting segments still valid under the current leader.
Performance overhead of working with remote files in tiered storage
Besides the normal two-pass processing to compact the log segment in classic topic, there is extra overhead working with remote tiered storage:
- The log segment file will need to be uploaded to remote tiered storage first before the segment file is cleaned. In most cases, the just-closed log segment file is not cleanable either because the dirty ratio is not satisfied or the transaction boundary (LSO pointer) is not at the end of the segment file. Later on the content of the segment file will go through a compaction and re-uploaded again;
- The log segment file might need to be fetched to local to build the offset index map for the first pass of the compaction processing. To avoid this download, we will try to use the local segment file if it is still present. We are encouraging users to set higher local.retention.ms/log.retention.bytes to extend the life span of the local segment file.
- The risk of running out of disk space when we download many log segment files from remote through two-pass compaction. The current two-pass scan builds out the big offset map for all the messages in the cleanable offset range in the first pass and then goes through groups of segments one by one in the second pass. To make sure we will operate within the disk space capacity, we will implement chunked downloading. Instead of downloading all dirty segments at once, the cleaner will download a bounded set of remote segments (fitting within loca.segment.bytes limits), process them into the standard OffsetMap, and then release the local disk space before downloading the next chunk. This allows compacting large remote logs with constant disk usage.
- During the swap stage, the old log segment file in remote tiered storage will be removed and the new log segment file will be uploaded onto remote tiered storage. This is the second upload for the same log segment file.
- Another performance issue with remote files is when we need to sum up segment file byte size when we calculate dirty ratio (cleanable-bytes/total-bytes). The segment file size for local segment file is immediately available however the segment byte size for remote segment file is not. We can retrieve the segment byte size from RemoteLogMetadata for each remote segment and keep the sum in LogCleanerManager so we don’t have to refetch them for the next dirty ratio calculation.
Deletion of local log segment file and indexes on local.retention.ms/local.retention.bytes trigger
cleanup.policy=compact,delete
When the cleanup policy is set to compact,delete: the local log segment file can still be removed by the LogRetention thread when the condition is met. This is the same as for append-only logs
cleanup.policy=compact
When the retention policy is set to compact: we will remove local log segment file/indexes when all the messages in the file passed local.retention.ms and all messages in the file are cleaned (offset < cleaner-checkpoint).
- The LogRetention thread will be updated to check the cleaner-checkpoint.
- We will modify UnifiedLog.deleteOldSegments() to verify that a segment’s largest offset is less than the cleaner-checkpoint
- If this condition is met and the file exceeds local.retention.ms, the local file is deleted. The remote segment remains intact.
Deletion of remote log segment file and indexes on total retention.ms trigger
If the cleanup policy of the topic is compact,delete, the remote log segment file can also be removed when the condition is met. This is the same as for append-only logs;
Compatibility, Deprecation, and Migration Plan
Backward Compatibility
- The change is fully backward compatible. Existing topics using cleanup.policy=delete in tiered storage will continue to function as before
Migration
- Users can enable this feature on existing compacted topics by setting remote.storage.enable=true (previously prevented with an exception)
- Users can enable compaction on existing tiered topics by changing cleanup.policy to compact (or compact,delete)
Performance Impact
- The “Download-Clean-Upload” cycle introduces network overhead for the broker. However, since log compaction is a background process, this overhead is expected to be manageable;
- We have explored ways to reduce the chance we need to download the remote log segment file (see discussion above)
Test Plan
Unit tests will verify that LogCleaner correctly identifies and fetches remote segments, and that deletion and upload of compacted log segments occur.
Integration tests will verify the creation of remote storage topics with compaction enabled, produce data with duplicate keys to force compaction, and verify data on remote storage is compacted, and that consumer reads only receive the messages post-compaction.
System tests will be run with `compact,delete` to verify that both policies when enabled simultaneously are stable under load.
Rejected Alternatives
Delay the tiered storage file uploading until the file is compacted
An alternative design is to delay the log segment file uploading to remote tiered storage until the file finishes compaction/cleaning. This design would reduce the need to download the file from remote, going through compaction and re-uploading. However this design deeply couples the LogRotation/Uploading with LogCleaning/LogRetention, when the log cleaning is throttled, errored out or blocked for other reasons we cannot upload the file to remote which affects durability of the log segment and runs the danger of running out disk space. Similarly any slowdown or errors during log retention processing (removing log segment) would also block the log file uploading and runs the danger of running out of disk space.
Compaction directly on RemoteStorage (Serverless/compute-separated):
- Idea: Spin up a separate worker or use a cloud function to read remote objects, compact them and write them back without involving the Kafka broker’s local disk
- Reason for rejection: This would require a completely new architecture for the Log Cleaner and significant changes to the RemoteStorageManager interface. It introduces distributed system complexities (locking, fencing) that are currently handled inherently by the broker’s leadership model. The proposed changes can leverage and reuse existing stable logic, unlike this alternative.
