DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Thomas Thornton, Henry Cai
Status
Current state: “Under Discussion"
Discussion thread: here
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-1023 Follower fetch from tiered offset introduced a significant optimization for tiered storage-enabled topics. When enabled via the `follower.fetch.last.tiered.offset.enable` broker configuration, an empty follower can skip replicating data that has already been uploaded to remote storage and instead begin fetching from the earliest pending upload offset (the next offset after the last tiered offset). This dramatically reduces the time required for a new follower to join the ISR. In practice, the follower only needs to replicate 10-15% of the data compared to before.
However, as acknowledged in KIP-1023, this optimization introduces a risk factor:
“A drawback of using the last-tiered-offset is that this new follower would possess only a limited number of locally stored segments. Should it ascend to the role of leader, there is a risk of needing to fetch these segments from the remote storage, potentially impacting broker performance.”
When a follower bootstrapped via the tiered offset becomes leader, it has significantly fewer local log segments than a follower that replicated all data from the leader. If consumers request offsets that exist only in remote storage, the new leader must fetch data from remote storage to serve these requests. This can cause:
- Increased latency: for consumer fetch requests that hit remote storage
- Potential broker performance degradation: under high request volumes
This KIP proposes to mitigate this risk by deprioritizing followers with limited local log segments during leader election, ensuring that followers with more local data (earlier local-log-start-offset) are preferred as leaders when available.
Public Interfaces
Broker Configuration Options
A new broker configuration will be added to control the leader election behavior for tiered storage followers:
leader.election.eligible.local.log.ms
Property | Value |
Type | Long |
Default | -1 (disabled) |
Dynamic Update | cluster-wide |
Description | The minimum time span (in milliseconds) that a replica's local log must cover for the replica to be considered eligible for leader election preference. A replica whose local log spans at least this duration (i.e., currentTime - localLogStartTimestamp >= threshold) is considered to have sufficient local data for leadership. Set to -1 to disable time-based eligibility. |
leader.election.eligible.local.log.bytes
Property | Value |
Type | Long |
Default | -1 (disabled) |
Dynamic Update | cluster-wide |
Description | The minimum size (in bytes) of a replica's local log for the replica to be considered eligible for leader election preference. A replica whose local log size is at least this value is considered to have sufficient local data for leadership. Set to -1 to disable bytes-based eligibility. |
NOTE: Setting either config > 0 enables the feature. When both are set, a replica becomes eligible when EITHER condition is met (whichever occurs first). No separate boolean toggle needed.
Topic Configuration Overrides
Both broker configs above have topic-level overrides, following the standard Kafka pattern (e.g., `retention.ms`, `log.retention.ms`). The topic-level config takes precedence when set; otherwise the broker-level default applies. The topic-level config names are the same as the broker-level names.
Protocol Changes
Fetch Request (new fields)
The existing FetchRequest has a LogStartOffset field, but it does not include information about the time span or size of a follower's local log. To support the controller's eligibility determination, this KIP adds two new fields so followers can report their local log coverage to the leader:
FetchPartition => ... existing fields ... LocalLogStartTimestamp => INT64 // NEW - timestamp of earliest message in follower's local storage LocalLogSizeBytes => INT64 // NEW - total size of follower's local log in bytes
AlterPartitionRequest (new fields)
The controller needs each replica's local log coverage (time span and size) for leader election eligibility determination. This KIP extends AlterPartition to include this for all ISR members:
AlterPartitionRequest => TopicId [Partitions]
TopicId => UUID
Partitions => PartitionIndex LeaderEpoch PartitionEpoch [NewIsrWithEpoch] LeaderRecoveryState
PartitionIndex => INT32
LeaderEpoch => INT32
PartitionEpoch => INT32
NewIsrWithEpoch => BrokerId BrokerEpoch LocalLogStartTimestamp LocalLogSizeBytes
BrokerId => INT32
BrokerEpoch => INT64
LocalLogStartTimestamp => INT64 // NEW FIELD
LocalLogSizeBytes => INT64 // NEW FIELD
LeaderRecoveryState => INT8
- LocalLogStartTimestamp: The timestamp of the earliest record in the replica's local log storage. For replicas bootstrapped via the tiered offset, this will be a recent timestamp. For established replicas, this will be approximately currentTime - local.retention.ms.
- LocalLogSizeBytes: The total size in bytes of the replica's local log segments.
Proposed Changes
High-Level Design
The core idea is to incorporate local log coverage (measured by time span and/or size) as a factor in leader election decisions. When multiple replicas are eligible to become leader (i.e., they are in the ISR or ELR), the controller should prefer replicas that have accumulated sufficient local data, as these replicas can serve consumer requests without accessing remote storage.
Unlike an offset-based comparison between replicas, this design uses an absolute threshold: each replica is independently evaluated for eligibility based on how much local data it has. This is throughput-independent and gives operators direct control over the maximum duration of rack imbalance after a new replica bootstraps.
Understanding the Current KIP-1023 Implementation
The implementation determines whether to fetch from the last tiered offset in `ReplicaFetcherThread.shouldFetchFromLastTieredOffset()`. This returns true only when: (1) the feature is enabled, (2) remote storage is enabled for the topic, (3) the topic is not compacted, (4) the replica is empty (offset 0), and (5) the leader has data.
Leader Election Algorithm Changes
The current leader election algorithm in `PartitionChangeBuilder.electLeader()` has two paths:
- Preferred Leader: electPreferredLeader() - prefers the first replica in the assignment
- Any leader election: electAnyLeader() - selects any valid leader from ISR/ELR
Both paths will be modified to consider local log eligibility when the feature is enabled (either config set > 0).
When the feature is enabled, the key change is to stably sort targetReplicas such that eligible replicas (those meeting the local log threshold) are preferred over non-eligible replicas. Among eligible replicas, the original assignment order is preserved. Among non-eligible replicas, the original assignment order is also preserved.
This ensures:
- Replicas with sufficient local data are considered first for leadership
- No unnecessary leader churn among replicas that all have adequate local data
- Rack-balanced preferred replicas regain leadership once they become eligible
Replicas with unknown local log state (e.g., brokers that haven't reported yet) are treated as eligible. This is a conservative default. We assume unknown replicas have complete local data until proven otherwise.
Eligibility Criteria
A replica is considered eligible for leader election preference when at least one of the following conditions is met:
- Time-based: leader.election.eligible.local.log.ms is set (> 0) AND currentTimeMs - localLogStartTimestamp >= leader.election.eligible.local.log.ms
- Bytes-based: leader.election.eligible.local.log.bytes is set (> 0) AND localLogSizeBytes >= leader.election.eligible.local.log.bytes
If neither config is enabled (both are -1), the feature is off and all replicas are treated equally (current behavior).
Example with leader.election.eligible.local.log.ms = 600000 (10 minutes):
- Broker 1: localLogStartTimestamp = 1 hour ago → eligible (1 hour >= 10 min)
- Broker 2: localLogStartTimestamp = 30 min ago → eligible (30 min >= 10 min)
- Broker 3: localLogStartTimestamp = 3 min ago → NOT eligible (3 min < 10 min)
Result: Brokers 1 and 2 are preferred equally (original order preserved), Broker 3 is deprioritized. Once Broker 3 accumulates 10 minutes of local data, it becomes eligible and the auto leader rebalancer restores rack-balanced leadership.
Threshold Tradeoffs
Choosing a threshold value involves a tradeoff: a higher value means more confidence that the new leader will have sufficient local data to serve consumers without remote fetches, but increases the duration of rack imbalance while the new replica builds up local data. A lower value restores rack balance faster but accepts a small window where the new leader may need to serve some requests from remote storage. In environments where consumers and followers are mostly caught up with real-time data, a low value (e.g., 10 minutes) is sufficient since those consumers will not request data older than what the new leader has locally.
Tracking Local Log State
Currently, only the partition leader can send AlterPartition requests to the controller (see AlterPartitionManager). The leader sends these requests when ISR changes occur via Partition.maybeExpandIsr and Partition.maybeShrinkIsr.
To track local-log-start-offset for all replicas, this KIP proposes:
- Followers report to leader: Extend the FetchRequest to include the follower's localLogStartTimestamp and localLogSizeBytes. The leader already tracks follower state (e.g., logEndOffset) in the Replica class.
- Leader aggregates and reports: When sending AlterPartition requests, the leader includes localLogStartTimestamp and localLogSizeBytes for all ISR members (not just itself).
- Controller stores and uses: The controller stores these in partition state and uses them during leader election to determine replica eligibility.
Handling Log Retention Changes
The localLogStartTimestamp represents the timestamp of the earliest record in the replica's local log. In most cases this value is stable once set, since new data is appended at the end and does not affect the earliest record. However, it can advance if log retention deletes the oldest local segments. The controller computes the time span as currentTime - localLogStartTimestamp, which grows passively over time as long as the earliest local segment is retained.
localLogSizeBytes changes as new data is written to the local log (grows) or as log retention deletes old local segments (shrinks).
When either value changes, the follower will report the new values in its next FetchRequest. The leader will detect the change and send an AlterPartition request to update the controller, even if the ISR membership hasn't changed. This requires that the leader tracks each follower's last reported localLogStartTimestamp and localLogSizeBytes, so that it can determine if the values have changed and send the AlterPartition request. This leverages the existing FetchRequest & AlterPartition protocol, and avoids followers updating state directly with the controller.
Staleness consideration: If localLogStartTimestamp advances due to retention but hasn't been reported yet, the controller will have a stale older timestamp. This makes the replica appear more eligible than it actually is (the time span looks larger), which is benign. For localLogSizeBytes, a stale lower value during initial growth may delay eligibility by one reporting cycle, which is acceptable.
Broker Side Changes
Brokers need to report their local log state when sending AlterPartition requests. The ReplicaManager will include these values when constructing AlterPartition requests.
Follower changes:
- Include localLogStartTimestamp and localLogSizeBytes in every FetchRequest sent to the leader
- localLogStartTimestamp is derived from the time index for the local log start offset. localLogSizeBytes is the sum of local log segment sizes.
Leader changes:
- Store each follower's localLogStartTimestamp and localLogSizeBytes in ReplicaState (new fields).
- Detect when a follower's localLogStartTimestamp or localLogSizeBytes changes.
- New trigger for AlterPartition: Send AlterPartition to controller when:
- ISR membership changes (existing behavior), OR
Any ISR member's localLogStartTimestamp or localLogSizeBytes changes (new behavior).
- ISR membership changes (existing behavior), OR
- Include localLogStartTimestamp and localLogSizeBytes for all ISR members in AlterPartition requests.
Controller changes:
- Store localLogStartTimestamp and localLogSizeBytes for each replica in partition metadata.
- Use these values to determine replica eligibility during leader election when the feature is enabled.
Interaction with Existing Features
Eligible Leader Replicas (ELR) - KIP-966
This KIP is compatible with ELR. When ISR is empty and ELR is used for leader election, the same local log eligibility preference will apply to ELR members.
Preferred Leader Election
When the feature is enabled, the preferred leader election will sort replicas by eligibility before selecting the preferred replica. Eligible replicas are preferred over non-eligible replicas. Among eligible replicas, the original assignment order determines preference.
KIP-1023 Follower Fetch from Tiered Offset
This KIP directly addresses the risk factor identified in KIP-1023.
When both follower.fetch.last.tiered.offset.enable and the leader election eligibility configs are enabled:
- New followers can quickly join ISR by fetching from the tiered offset.
- These followers will be deprioritized for leader election until they have accumulated sufficient local data (as defined by the time/bytes threshold).
No special handling is required for this case—the existing tiered storage infrastructure handles remote fetches transparently.
Compatibility, Deprecation, and Migration Plan
Upgrade Path
- Rolling upgrade: The new configurations default to -1 (disabled), so existing behavior is preserved during upgrade.
- Enable feature: After all brokers are upgraded, set leader.election.eligible.local.log.ms and/or leader.election.eligible.local.log.bytes to desired values cluster-wide.
- Gradual rollout: The feature can be enabled on a per-cluster basis, allowing operators to test in non-production environments first
Backward Compatibility
- The new FetchRequest version is backward compatible; older followers will not send the LocalLogStartTimestamp or LocalLogSizeBytes fields.
- The new AlterPartition request version is backward compatible; older brokers will not send the new fields.
- Controllers will handle missing local log state by treating those replicas as eligible (conservative default).
- The feature is opt-in via configuration, preserving existing behavior by default.
Downgrade/Rollback Path
- Set leader.election.eligible.local.log.ms and leader.election.eligible.local.log.bytes to -1 cluster-wide.
- Perform rolling downgrade.
- The controller will stop using local log eligibility in election decisions.
Test Plan
- Unit tests for PartitionChangeBuilder to verify leader election prefers eligible replicas over non-eligible replicas.
- Unit tests for eligibility computation (time-based, bytes-based, combined).
- Unit tests for FetchRequest serialization of LocalLogStartTimestamp and LocalLogSizeBytes fields.
- Unit tests for AlterPartition request/response serialization of new fields.
- Integration tests with tiered storage to verify new brokers are deprioritized for leader election until they accumulate sufficient local data.
- Integration tests to verify leader election succeeds when only non-eligible replicas are available (no hard block).
- Integration tests to verify eligible replicas regain preferred leadership via auto leader rebalancer.
Rejected Alternatives
- Strict exclusion: Completely exclude tiered-storage followers from leader election until they have sufficient local data. Rejected because this could cause unavailability if all ISR members are tiered-storage followers.
- Wall-clock time-based exclusion: Exclude followers for a configurable time period after joining ISR. Rejected because wall-clock time since ISR join does not reflect actual data replication progress. A replica could be in ISR for 10 minutes but have experienced slow replication and still have minimal local data.
- Percentage threshold: Only elect replicas with local log covering X% of total log range. Rejected because it adds complexity; absolute thresholds (time span, bytes) are simpler and more intuitive for operators.
- Offset-based comparison between replicas: Compare replicas' localLogStartOffset values against each other, deprioritizing replicas whose offset is significantly higher than peers. Rejected because convergence time is coupled to topic throughput: for high-throughput topics, the offset gap between existing and newly-bootstrapped replicas can be millions of messages, and the gap only closes as the old replica's local retention truncates segments. This means rack imbalance can persist for the full duration of local.retention.ms (potentially hours), even when all consumers are caught up. The time/bytes-based absolute threshold approach decouples eligibility from topic throughput and gives operators direct control over the maximum duration of rack imbalance.