...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
def calculateRemoteTierSize() { // Find the leader epochs from leader epoch cache. val validLeaderEpochs = fromLeaderEpochCacheToEpochs(log) // For each leader epoch in current lineage, calculate size of log val remoteLogSizeBytes = validLeaderEpochs.map(epoch => rlmm.getRemoteLogSize(epoch)).sum remoteLogSizeBytes }// the new API would be used for size based retention as: val totalLogSize = remoteLogSizeBytes + log.localOnlyLogSegmentsSizevar remainingSize = if (shouldDeleteBySize) totalLogSize - retentionSize else 0val0 val segmentsIterator = remoteLogMetadataManager.listRemoteLogSegmentwhilelistRemoteLogSegment while (remainingSize > 0 && segmentsIterator.hasNext) { // delete segments segments } |
Code changes
- Add the new API to
RemoteLogMetadataManager
- Implement the new API at
TopicBasedRemoteLogMetadataManager
(with unit tests) Add the new metric when code for RemoteLogManager has been merged.
...