DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Under discussion"
Discussion thread: here
JIRA: KAFKA-19970
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Kafka's tiered storage feature introduced in KIP-405 enables offloading log segments to remote storage systems (e.g., S3, HDFS, Azure Blob Storage), significantly reducing local disk requirements for large-scale deployments. To efficiently serve fetch requests from remote storage, Kafka brokers maintain a local cache of remote log segment indexes (offset, time, transaction, and producer snapshot indexes) in the RemoteIndexCache.
This KIP addresses a critical bug in the remote index cache implementation. The RemoteIndexCache currently holds expired and stale indexes indefinitely, leading to memory waste and suboptimal cache utilization.
In Kafka's tiered storage architecture, only the leader of a partition deletes remote log segments and clears any associated cached index files. This design creates a fundamental problem:
1. After Leadership Changes: When a broker loses leadership of a partition, it retains all previously cached index files in memory. The new leader may eventually delete old remote log segments, but the previous leader has no mechanism to learn about these deletions and continues holding stale indexes indefinitely.
2. With Read-From-Replica Enabled: When consumers read from follower replicas using the read-from-replica feature, follower brokers cache index files for the partitions they serve. These follower brokers never receive deletion notifications when the leader removes old segments, causing stale indexes to accumulate on follower replicas.
Without time-based eviction, these stale index files remain in the cache until either:
- The broker restarts (clearing all cache)
- The cache size limit is reached and size-based eviction removes them (which may never happen if they are small enough)
The current size-based eviction (`remote.log.index.file.cache.total.size.bytes`) is insufficient because it only triggers when the cache is full, and doesn't distinguish between active and stale entries. During backfill workloads, this causes "cache thrashing" where small stale indexes remain while large active indexes are repeatedly evicted and re-fetched.
Impact
The lack of time-based eviction causes operational problems:
- Memory Waste: Stale indexes accumulate indefinitely, consuming heap memory that cannot be reclaimed. In the leadership change and read-from-replica scenarios described above, brokers retain indexes for partitions they no longer serve, leading to:
- Heap utilization
- GC pressure - Reduced Cache Effectiveness: When stale entries occupy cache space, active indexes are evicted prematurely, reducing cache hit rates and increasing remote storage API calls.
Proposed Changes
To address the unbounded memory growth outlined above, we propose adding time-based eviction (TTL) alongside the existing size-based eviction. This KIP proposes to expose remote.log.index.file.cache.ttl.ms as a user-configurable broker configuration parameter, enabling operators to tune the index cache time-based eviction behavior to address the stale index bug and optimize cache performance for their deployment requirements. The implementation leverages Caffeine cache's `expireAfterAccess()` mechanism, which evicts cache entries after a specified duration since their last access (read or write). This provides automatic cleanup of stale indexes and complements the existing size-based eviction strategy.
The implementation combines two independent eviction mechanisms:
1. Time-Based Eviction (TTL):
- Tracks last access time for each entry
- Background thread periodically scans for expired entries
- Eviction happens asynchronously without blocking reads
- Access resets the TTL timer (entries are "promoted")
2. Size-Based Eviction (Existing):
- Enforces remote.log.index.file.cache.total.size.bytes limit
- Uses LRU (Least Recently Used) policy
- Triggered immediately when cache size exceeds limit
- Takes precedence over TTL (hard limit)
How It Solves the Stale Index Bug:
Time-based eviction provides the missing cleanup mechanism:
- After leadership loss: Index files cached by the previous leader are automatically evicted after the TTL expires (default: 15 minutes), freeing memory without requiring coordination with the new leader
- On follower replicas: Index files cached by followers (when read-from-replica is enabled) are automatically cleaned up after inactivity, preventing unbounded accumulation
- No coordination required: Each broker independently manages its cache TTL, eliminating the need for cross-broker deletion notifications
Combined Eviction Strategy:
- Size-based eviction: Continues to enforce the
remote.log.index.file.cache.total.size.byteslimit - Time-based eviction: New TTL-based eviction ensures entries are removed after
remote.log.index.file.cache.ttl.msmilliseconds of inactivity
When an index entry is evicted (either due to size or TTL constraints), it is simply removed from the in-memory cache. Subsequent fetch requests requiring the evicted index will transparently re-fetch it from remote storage.
Monitoring
The implementation supports optional cache statistics recording via Caffeine's recordStats() method. Statistics recording is disabled by default to avoid bookkeeping performance overhead on cache operations.
While this KIP does not propose new metrics, operators can monitor the effectiveness of TTL configuration and detect stale index issues through existing broker metrics:
- RemoteBytesInPerSec - Higher values may indicate excessive index re-fetching
- Fetch request latency metrics - Increased P99 latency may indicate cache thrashing
Public Interfaces
Broker Configuration
New Public Configuration Parameter:
CONFIGURATION
public static final String REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_PROP = "remote.log.index.file.cache.ttl.ms";
public static final String REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS_DOC = "The maximum time in milliseconds an index file entry can remain in the cache after its last access. After this duration, the entry will be evicted even if there is available space. This helps prevent stale index files from remaining in cache indefinitely, particularly when a broker is no longer the leader for a partition or when read-from-replica is enabled. Evicted index files are automatically re-fetched from remote storage when needed. Default is 15 minutes (900000 ms), which provides sufficient time for clients to read a partition/segment while ensuring stale entries don't accumulate. Set to -1 to disable time-based eviction and use only size-based eviction.";
public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TTL_MS = 900000L; // 15 minutes
Leverage Caffeine Cache's expireAfterAccess() API to implement time-based eviction:
CacheBuilder.newBuilder()
.maximumSize(maxCacheSize)
.expireAfterAccess(Duration.ofMillis(ttlMs))
.removalListener(this::handleRemoval)
.build();
No API changes. This configuration affects only internal broker behavior and does not modify any client-facing APIs, protocols, or semantics. Evicted indexes (including stale ones) are transparently re-fetched from remote storage if needed again.
Compatibility, Deprecation, and Migration Plan
This change is fully backward compatible.
- No configuration changes are required during upgrade
- No client-side changes required
- No deprecations are introduced by this KIP.
Test Plan
The implementation includes comprehensive test coverage in RemoteIndexCacheTest.java through unit and integration tests.
Unit tests
- testCacheTtlEviction
- Validates that cache entries are evicted after the configured TTL expires
- Uses Ticker mock to advance time deterministically
- Verifies that accessing an entry before TTL expiry resets the expiration timer
- testCacheEntryExpiry - Ensures cache respects remote.log.index.file.cache.total.size.bytes limit
- Combined Eviction Strategy Test - Validates that size-based and TTL-based eviction work together
Integration tests
Existing RemoteLogManager integration tests cover scenarios where evicted indexes (including stale ones) are re-fetched
Rejected Alternatives
New leader notifies old leaders to evict partition indexes - rejected because it requires new inter-broker protocol and it doesn't address read-from-replica scenario