DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Voting
Discussion thread: here or here (sometimes. the mail thread can't show all the mails. so I provided two links here)
Vote thread: here
JIRA: KAFKA-19893
PR: https://github.com/apache/kafka/pull/20913
Motivation
This KIP proposes an optional feature that provides an opportunity for cost savings on remote storage when the scenario is suitable to use.
Currently, Kafka's tiered storage implementation uploads all non-active local log segments to remote storage immediately, even when they are still within the local retention period.
This results in redundant storage of the same data in both local and remote tiers.
We can see the one real redundancy example in the following picture. (I just highlight only one segment.)
When there is no requirement for real-time analytics based on remote storage directly. It has the following drawbacks:
1. Wastes storage capacity and costs: The same data is stored twice during the local retention window
2. No immediate benefit: During the local retention period, reads prioritize local data, making the remote copy unnecessary
Example scenario and this KIP's goal:
Consider a topic with remote storage enable:
- Local retention: 1 day (24 hours)
- Remote retention: 3 days (72 hours)
Data is stored in both tiers for the first day, resulting in about 16 hours of redundant storage. This can leads to cost waste.
So we can reduce this storage redundancy. In this case, it will save about 25% cost payment for total size of disk.
We can take an AWS S3 billing example (last 3 months) to show the cost saving detail:
AWS S3 has two main cost items for Kafka usage (normally, Kafka deployed machines and the remote storage/S3 bucket are placed in the same region so that we can ignore the network transfer cost).
The cost item TimedStorage-ByteHrs represents the storage part which costs are based on the stored object’s lifetime.
In above case, each remote segment lives for about 64 hours. thus, with the new optional feature, each will live for about 48 hours, resulting in approximately 25% cost savings.
In this billing example, the cost would decrease about 67K per quarter. BTW: if our topic is local:1 day + remote: 7 days . the saving part will be about 10% (27K)
so it mean that if you set 1 day local + 3-7 days remote. the cost-saving for TimedStorage-ByteHrs will range from 25% to 10%..
Note: You can also refer to Amazon S3 Cost to know more information.
However, this optimization is offered as a topic level optional configuration rather than the default behavior based on followed scenarios:
(1) Some users/topics rely on remote storage for real-time analytics based on remote storage directly and need the latest data to be available as soon as possible (In fact, it only tries to stay as up-to-date as possible,
because it still can’t include the latest data because the active segment always hasn’t been uploaded yet.).
(2) Some topics may set a very high ratio for remote-to-local retention time. The cost savings amount will be small, so it is mainly to avoid waste. Users may think it is not worth enabling the feature for the topics
Considering the latency or risk of remote storage, the local retention period won’t be set too short for me. For example, in our production environment, we keep one day of local data alongside 3-7 days in remote storage, Keeping one day of data in local storage ensures that Kafka users have enough time to handle unexpected consumption issues while maintaining good latency, and it also reduces the risk of relying on remote storage. so there’s still one day of redundancy for my case. What’s more, even if you configure a very short local retention time, you may need to extend it when certain issues occur. The more local retention time you keep, the more cost savings you achieve.
(3) Kafka admin want to reduce the expansion times for local disk when remote storage down for a long time. after all. The already uploaded segments are eligible for deletion from broker when not enable the feature for topic.
You can check the follow picture ( Configure: 1 day local + 3 day remote. and set the delay to max value: the local retention time/size) to understand the logic:
(a) If the remote storage outage for a short time: no matter if you enable the feature. it don't have difference. (So if you don't set the delay to max value. you have more time to avoid expansion)
(b) If the remote storage outage for a medium-term time
You will need one extra expansion: the max size is the your saving cost's part. You can think you should return back the save part/redundancy by expansion.
(c) If the remote storage outage for a long time (check the 48 hours' outage example in picture)):
No matter if you enable the feature. you should keep doing expansion due to remote upload failed.
Public Interfaces
This KIP introduces:
(1) Two new topic configuration items: remote.copy.lag.ms and remote.copy.lag.bytes
BTW: topic's remote storage feature already had some others configure items such as remote.log.delete.on.disable, remote.log.copy.disable, etc.
Considering that if we need to keep the same style. the configure name shoud be remote.log.copy.lag.ms/bytes. thus. the broker configure will be log.remote.log.copy.lag.ms/bytes. so that it look strange with duplicated "log". So decide to use this name style without "log".
public static final String REMOTE_COPY_LAG_MS_CONFIG = "remote.copy.lag.ms";
public static final String REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
"When set to 0 (default), segments are uploaded as soon as they are eligible (no delay check). " +
"When set to a positive value (ms), a segment can't become eligible for upload util the time since the latest record in the segment reaches the value. " +
"The value should not exceed the real local retention ms except the latter is save forever. " +
"When set to -1, resolves to the real local retention ms (maximum delay, or no delay check when the retention is save forever). " +
"For how the real local retention time is computed, see <code>local.retention.ms</code>.";
public static final String REMOTE_COPY_LAG_BYTES_CONFIG = "remote.copy.lag.bytes";
public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
"When set to 0 (default), segments are uploaded as soon as they are eligible (no delay check). " +
"When set to a positive value (bytes), a segment can't become eligible for upload util the total bytes of log data after the segment reach the value. " +
"The value should not exceed the real local retention bytes except the latter is save forever. " +
"When set to -1, resolves to the real local retention bytes (maximum delay, or no delay check when the retention is save forever). " +
"For how the real local retention size is computed, see <code>local.retention.bytes</code>.";
(2) Two new server configuration items: log.remote.copy.lag.ms and log.remote.copy.lag.bytes
If a user wants to enable the lazy copy behavior for all the topics (including the new ones), then they can set this broker level configs. Otherwise,
it will be hard for the user to create the new topics with these configs set when remote storage is enabled.
public static final String LOG_REMOTE_COPY_LAG_MS_PROP = "log.remote.copy.lag.ms";
public static final String LOG_REMOTE_COPY_LAG_BYTES_PROP = "log.remote.copy.lag.bytes";
The default value are 0 so that the whole remote storage module keeps the original behavior.
Proposed Changes
You can refer to https://github.com/apache/kafka/pull/20913 for the detailed changes.
We change the RemoteLogManager.RLMCopyTask#candidateLogSegments's logic for decide one segment if need to upload to remote:
The left part is old logic. The right part is new logic with new extra judgement (yellow color) added
You can see the uploading will be delayed if the configure items set with non default value. And After the change, the remote tiered storage redundancy will be reduced with delayed upload.
You can refer to the test case and result: https://github.com/apache/kafka/pull/20913#issuecomment-3547156286
The storage compare in my test: Partition 2 (host on the machine with the delay upload feature) is about 50% of Partition 0 or Partition 1.
BTW: Here are some additional thoughts/considerations.
- Local files won’t be deleted until they’ve been uploaded to the remote storage, so this change is very safe
You don’t need to worry about files being cleaned up before they be upload to the remote. When local and complete retention values are set to the same. What is the behavior if the delay lag is set to local retention?
This is corner case. So keep same as the existing behaviour; the segment will be uploaded to remote, then allowed for local-log deletion.NoteNote: Actually, this is a valid case because local retention only not allow to > remote retention in current code. If they are equal, it is better to skip the update, since the segment would be immediately deleted after being uploaded to remote storage when you also set the lag to max one (local rentention configure value) However, if we do not upload it to remote storage, the local segment will not be deleted because it waits for the highest offset in remote storage to be updated after the upload. Moreover, if we skip the upload but directly update the highest offset in remote storage, it becomes ambiguous whether the segment has already been uploaded or not. Therefore, We can skip the upload and update the LogStartOffset. The demo PR is: https://github.com/apache/kafka/pull/21361, Considering this is a corner case and this solution also helps address another issue: if the remote storage service is unavailable for a long time, local segments may never get deleted forever even it over the retention time. It means that it isn't special case for this KIP. So I just list this thought here.
Compatibility, Deprecation, and Migration Plan
Backward Compatibility
This change is fully compatible:
Backward compatible: Default configures' value maintains current behavior
Forward compatible: Older clients unaware of these configs will ignore it- Deprecation
N/A - Migration for Existing Deployments
N/A. The feature is optional
Test Plan
We can use follow tests to cover the change:
local retention time | local retention size | lag time | lag size | Expect result |
-1 | -1 | 0 | 0 | No delay check |
-1 | -1 | No delay check | ||
1 day | 1 GB | lag: 1 day + 1 GB | ||
3 day | -1 | 0 | 0 | No delay check |
-1 | -1 | lag: 3 day + no size check | ||
1 day | 1 GB | lag: 1 day + 1 GB | ||
-1 | 3 GB | 0 | 0 | No delay check |
-1 | -1 | lag: no time check + 3 GB | ||
1 day | 1 GB | lag: 1 day + 1 GB | ||
3 day | 3 GB | 0 | 0 | No delay check |
-1 | -1 | lag: 3 day + 3 GB | ||
1 day | 1 GB | lag: 1 day + 1 GB |
Unit Tests:
- Test upload eligibility logic with different configure values
- Test configuration validation for topic
Integration Tests:
- Verify segments are uploaded before local segment deleted
- Test the remote storage reduced after topic enable the feature.
Rejected Alternatives
Alternative 1: Make this the default behavior
Reason for rejection: Some users require real-time remote analytics and need data uploaded as soon as possible. Breaking their use case would be unacceptable. after all it is the default behavior before this change.
Alternative 1: Using one flag(true/false) configure to choose: No delay or Delay with max value.
The current configuration covers flag/boolean-type configure's feature (0: No delay/-1: Max delay) and provides additional flexibility, allowing the total size or time of local segments to stay within local limits for a much longer period when remote storage is unavailable.







