DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread:
JIRA: KAFKA-20595 - Getting issue details... STATUS
Motivation
During broker startup, LogManager parallelizes log loading across partitions using a per-data-directory thread pool controlled by num.recovery.threads.per.data.dir. However, within each partition, LogLoader.loadSegmentFiles() processes segment files strictly sequentially: each LogSegment is opened and sanity-checked one at a time before proceeding to the next segment. This design becomes a startup bottleneck for brokers with a large number of segments per partition.
The sequential per-partition loading phase becomes the dominant startup cost. Increasing num.recovery.threads.per.data.dir only increases the number of partitions loaded concurrently, while segment loading within each partition remains serialized. As a result, broker startup time scales approximately linearly with the maximum segment count of any partition, regardless of the number of available CPU cores or recovery threads.
This KIP proposes parallelizing the per-partition segment loading phase. A new broker configuration, num.segment.loading.threads.per.data.dir, controls the thread pool size used to concurrently open and sanity-check segment files within a partition.
Public Interfaces
New Broker Configuration
num.segment.loading.threads.per.data.dir
| Property | Value |
|---|---|
| Type | Integer |
| Default | 1 |
| Validator | atLeast(1) |
The number of threads per data directory to be used for loading log segments in parallel during broker startup. A value of 1 disables parallel loading and processes segments sequentially (the default behavior). Values greater than 1 enable parallel segment loading, which can speed up startup when partitions have many segments.
New Metrics
A new gauge metric remainingSegmentsToLoad is added, providing visibility into parallel segment loading progress during broker startup.
| Metric | MBean | Tags |
|---|---|---|
| remainingSegmentsToLoad | kafka.log:type=LogManager,name=RemainingSegmentsToLoad | dir, threadNum |
The number of segments still pending parallel loading for a given data directory and loading thread. Only registered when num.segment.loading.threads.per.data.dir > 1. Removed after startup completes.
Proposed Changes
LogLoader.loadSegmentFiles() is extended to support parallel segment loading when a dedicated ExecutorService is supplied. LogManager creates this executor per data directory using the new num.segment.loading.threads.per.data.dir configuration. When the configuration is 1 (the default), no executor is created and the existing sequential code path is exercised unchanged.
Two-Phase Loading in LogLoader
Phase 1 — Parallel (open + sanity check)
A CompletableFuture is submitted to the segment loading executor for each file in the log directory. Each task:
- Index file — checks whether the corresponding .log file exists. If not, the index file is recorded as orphaned.
- Log file — calls LogSegment.open() followed by segment.sanityCheck().
- On success, the segment is added to LogSegments directly. This is safe because LogSegments is backed by a ConcurrentSkipListMap.
- On CorruptIndexException or NoSuchFileException, the segment is recorded for sequential recovery.
Phase 2 — Sequential (recovery)
After all parallel tasks complete:
- Orphaned index files are logged and deleted.
- Segments that encountered errors are recovered via the existing
recoverSegment()method, then added to LogSegments.
recoverSegment() is kept sequential because it mutates shared state (the leader epoch cache).
Progress Metric
When the segment loading executor is active, LogLoader.loadSegmentFilesInParallel() tracks the number of remaining segments per thread by writing to a ConcurrentMap<String, Integer> keyed by Thread.currentThread().getName(). The metric is deregistered in removeLogRecoveryMetrics() after startup completes.
Compatibility, Deprecation, and Migration Plan
Since this is a newly introduced behavior, there are no compatibility concerns.
Test Plan
- All existing tests should continue to pass, and we will add tests to cover the new behavior for num.segment.loading.threads.per.data.dir
Rejected Alternatives
n/a