This KIP will add some configurations in the broker configuration file, please refer to the Configuration chapter.
There are two options: one is implement in the broker side, the other option is implement on the client side.
Option 1: Broker side
The changes are mainly focused on Log Manager. Below is the workflow:
- Every time when cleanupLogs task begins to check log retention, for every partition, we add a consumed log retention process before the force log retention.
- Then we will find the candidate log segments whose last modified time is not changed after the consumed log retention time for consumed log retention.
- Find the min offset of all the consumer groups’ commit offset of that partition.
- Decide whether the log segment can be deleted according to the min offset.
- After we finish the consumed log retention procedure, we will continue the forced log retention procedure for the remain log segments.
Option 2: Client Side
The client side is implement is the same in the find the min commit offset, the difference is after calculated the min commit offset, we used the KIP-47's trim Request to trim the log according to the min commit offset. This method make the broker simple but we should always running the admin tools with kafka, this may not always be possible.
Min Commit Offset
The min commit offset is the minimal commit offset of one partition of all the consumer groups. we can simply use the simple consumer’s API to find every consumer group’s commit offset, which had subscribed the topic containing the to-be deleted log segment. We can handle old consumer and new consumer in one function to query the commit offset like the consumer-offset-checker tool, and can handle potential failures like leader change or coordinator down. To avoid deleting the log segment which are not really consumed, whenever encounter commit offset querying exception, we will set the min commit offset to -1.