DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Thomas Thornton, Henry Cai
Status
Current state: “Under Discussion"
Discussion thread: here
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-1023 Follower fetch from tiered offset introduced a significant optimization for tiered storage-enabled topics. When enabled via the `follower.fetch.last.tiered.offset.enable` broker configuration, an empty follower can skip replicating data that has already been uploaded to remote storage and instead begin fetching from the earliest pending upload offset (the next offset after the last tiered offset). This dramatically reduces the time required for a new follower to join the ISR. In practice, the follower only needs to replicate 10-15% of the data compared to before.
However, as acknowledged in KIP-1023, this optimization introduces a risk factor:
“A drawback of using the last-tiered-offset is that this new follower would possess only a limited number of locally stored segments. Should it ascend to the role of leader, there is a risk of needing to fetch these segments from the remote storage, potentially impacting broker performance.”
When a follower bootstrapped via the tiered offset becomes leader, it has significantly fewer local log segments than a follower that replicated all data from the leader. If consumers request offsets that exist only in remote storage, the new leader must fetch data from remote storage to serve these requests. This can cause:
- Increased latency: for consumer fetch requests that hit remote storage
- Potential broker performance degradation: under high request volumes
This KIP proposes to mitigate this risk by deprioritizing followers with limited local log segments during leader election, ensuring that followers with more local data (earlier local-log-start-offset) are preferred as leaders when available.
Public Interfaces
Broker Configuration Options
A new broker configuration will be added to control the leader election behavior for tiered storage followers:
leader.election.prefer.early.local.log.start.offset
Property | Value |
Type | Boolean |
Default | False |
Dynamic Update | cluster-wide |
Description | When enabled, the leader election algorithm will prefer replicas with earlier (lower) local-log-start-offset values. Specifically, replicas whose local-log-start-offset is closer to the partition's log-start-offset will be preferred over replicas that bootstrapped from the tiered offset and have limited local segments. This setting only affects leader election for tiered storage-enabled topics. |
leader.election.local.log.start.offset.threshold
Property | Value |
Type | Long |
Default | 10,000 |
Dynamic Update | cluster-wide |
Description | The minimum difference in local-log-start-offset (in number of messages) required to consider two replicas as having different amounts of local data. Replicas whose local-log-start-offsets differ by less than this threshold are considered equivalent, and the original assignment order is preserved. This prevents unnecessary leader elections due to minor differences caused by log retention running at slightly different times across brokers. Only applies when leader.election.prefer.early.local.log.start.offset is enabled. |
Topic Configuration Overrides
Both broker configs above have topic-level overrides, following the standard Kafka pattern (e.g., `retention.ms`, `log.retention.ms`). The topic-level config takes precedence when set; otherwise the broker-level default applies. The topic-level config names are the same as the broker-level names.
Protocol Changes
Fetch Request (new field)
The existing FetchRequest has a LogStartOffset field that followers send to the leader. However, this field contains the partition’s `logStartOffset` not the `localLogStartOffset`. For tiered storage followers, these differ:
- logStartOffset: partition’s earliest offset (including remote storage, determined by retention)
- localLogStartOffset: earliest offset in local storage (much higher than logStartOffset for tiered-storage followers)
This KIP adds a new field:
FetchPartition => ... existing fields ... LocalLogStartOffset => INT64 // NEW - earliest offset in follower's local storage
AlterPartitionRequest (new field)
The controller needs each replica's `localLogStartOffset` for leader election. This KIP extends `AlterPartition` to include this for all ISR members.
AlterPartitionRequest => TopicId [Partitions]
TopicId => UUID
Partitions => PartitionIndex LeaderEpoch PartitionEpoch [NewIsrWithEpoch] LeaderRecoveryState
PartitionIndex => INT32
LeaderEpoch => INT32
PartitionEpoch => INT32
NewIsrWithEpoch => BrokerId BrokerEpoch LocalLogStartOffset
BrokerId => INT32
BrokerEpoch => INT64
LocalLogStartOffset => INT64 // NEW FIELD
LeaderRecoveryState => INT8
The `LocalLogStartOffset` field represents the earliest offset available in the replica's local log storage. For replicas that bootstrapped via the tiered offset, this value will be significantly higher than the partition's log-start-offset.
Proposed Changes
High-Level Design
The core idea is to incorporate local-log-start-offset as a factor in leader election decisions. When multiple replicas are eligible to become a leader (i.e., they are in the ISR or ELR), the controller should prefer replicas with lower local-log-start-offsets, as these replicas have more data available locally and can serve consumer requests without accessing remote storage.
Understanding the Current KIP-1023 Implementation
The implementation determines whether to fetch from the last tiered offset in `ReplicaFetcherThread.shouldFetchFromLastTieredOffset()`. This returns true only when: (1) the feature is enabled, (2) remote storage is enabled for the topic, (3) the topic is not compacted, (4) the replica is empty (offset 0), and (5) the leader has data.
Leader Election Algorithm Changes
The current leader election algorithm in `PartitionChangeBuilder.electLeader()` has two paths:
- Preferred Leader: electPreferredLeader() - prefers the first replica in the assignment
- Any leader election: electAnyLeader() - selects any valid leader from ISR/ELR
Both paths will be modified to sort candidates by local-log-start-offset when `leader.election.prefer.early.local.log.start.offset` is enabled.
When `leader.election.prefer.early.local.log.start.offset is enabled`, the key change is to stably sort `targetReplicas` by local-log-start-offset (ascending) before selecting a leader. The stable sort preserves the original assignment order for replicas whose local-log-start-offsets are within the configured threshold. This ensures replicas with significantly more local data are considered first, while equivalent replicas retain their original preference order.
Replicas with unknown local-log-start-offset are treated as having 0 (prioritized). This is a conservative default. We assume unknown replicas have complete local data until proven otherwise.
Threshold for Equivalence
To maintain stability and avoid unnecessary leader elections due to minor differences in log retention timing across brokers, replicas are considered equivalent if their localLogStartOffset values differ by less than leader.election.local.log.start.offset.threshold (default: 10,000 messages).
When replicas are equivalent:
- The original assignment order is preserved (first replica in assignment list is preferred)
- This maintains the deterministic behavior of current preferred leader election
- Only replicas with significantly less local data (difference >= threshold) are deprioritized
Example with threshold = 10,000:
- Broker 1: localLogStartOffset = 1,000,000
- Broker 2: localLogStartOffset = 1,000,050 (equivalent to Broker 1, difference < threshold)
- Broker 3: localLogStartOffset = 5,000,000 (not equivalent, difference >= threshold)
The result is brokers 1 and 2 are preferred equally (original order preserved), broker 3 is deprioritized.
Replicas with unknown local-log-start-offset are treated as having 0 (prioritized). We prefer a conservative default. We assume unknown replicas have complete local data until proven otherwise.
Tracking Local Log Start Offset
Currently, only the partition leader can send AlterPartition requests to the controller (see AlterPartitionManager). The leader sends these requests when ISR changes occur via Partition.maybeExpandIsr and Partition.maybeShrinkIsr.
To track local-log-start-offset for all replicas, this KIP proposes:
- Followers report to leader: Extend the FetchRequest to include the follower’s localLogStartOffset. The leader already tracks follower state (e.g., logEndOffset) in the Replica class
- Leader aggregates and reports: When sending AlterPartition requests, the leader includes localLogStartOffset for all ISR members (not just itself)
- Controller stores and uses: The controller stores this in partition state and uses it during leader election to sort replicas
Handling Log Retention Changes
The local-log-start-offset can also increase when log retention deletes local log segments (see UnifiedLog.deleteSegments calling maybeIncrementLocalLogStartOffset). This creates a challenge: the leader only sends AlterPartition when ISR changes, but localLogStartOffset can change independently.
When a follower’s localLogStartOffset changes due to log retention, it will report the new value in its next FetchRequest. The leader will detect the change and send an AlterPartition request to update the controller, even if the ISR membership hasn’t changed. This requires that the leader tracks each follower’s last reported localLogStartOffset, so that it can determine if the value has changed and send the AlterPartition request. This leverages the existing FetchRequest & AlterPartition protocol, and avoids follower’s updating state directly with the controller.
Staleness consideration: If a follower’s localLogStartOffset increases due to log retention, but the leader hasn’t yet reported this to the controller, the controller may have a stale (lower) value. The low value may make the follower appear more preferred than it actually is. The worst case is electing a replica that has less local data than expected, which is acceptable since tiered storage handles this case transparently.
Broker Side Changes
Brokers need to report their local-log-start-offset when sending `AlterPartition` requests. The local-log-start-offset is already tracked in `UnifiedLog`. The `ReplicaManager` will include this value when constructing `AlterPartition` requests for ISR changes.
Follower changes:
- Include localLogStartOffset in every FetchRequest sent to the leader
- The value is already tracked in UnifiedLog.localLogStartOffset()
Leader changes:
- Store each follower's localLogStartOffset in ReplicaState (new field)
- Detect when a follower's localLogStartOffset changes
- New trigger for AlterPartition: Send AlterPartition to controller when:
- ISR membership changes (existing behavior), OR
- Any ISR member's localLogStartOffset changes (new behavior)
- Include localLogStartOffset for all ISR members in AlterPartition requests
Controller changes:
- Store localLogStartOffset for each replica in partition metadata
- Use localLogStartOffset to sort candidates during leader election when leader.election.prefer.early.local.log.start.offset is enabled
Interaction with Existing Features
Eligible Leader Replicas (ELR) - KIP-966
This KIP is compatible with ELR. When ISR is empty and ELR is used for leader election, the same local-log-start-offset preference will apply to ELR members.
Preferred Leader Election
When leader.election.prefer.early.local.log.start.offset is true, the preferred leader election will sort replicas by local-log-start-offset before selecting the "preferred" replica. The replica with the earliest local-log-start-offset becomes the preferred leader.
KIP-1023 Follower Fetch from Tiered Offset
This KIP directly addresses the risk factor identified in KIP-1023. When both `follower.fetch.last.tiered.offset.enable` and `leader.election.prefer.early.local.log.start.offset` are enabled:
- New followers can quickly join ISR by fetching from the tiered offset
- These followers will be deprioritized for leader election until they have accumulated more local data
- The cluster benefits from fast ISR recovery while minimizing the risk of electing a leader with limited local data
No special handling is required for this case—the existing tiered storage infrastructure handles remote fetches transparently.
Compatibility, Deprecation, and Migration Plan
Upgrade Path
- Rolling upgrade: The new configuration defaults to `false`, so existing behavior is preserved during upgrade
- Enable feature: After all brokers are upgraded, enable `leader.election.prefer.early.local.log.start.offset` cluster-wide
- Gradual rollout: The feature can be enabled on a per-cluster basis, allowing operators to test in non-production environments first
Backward Compatibility
- The new `FetchRequest` version is backward compatible; older followers will not send the `LocalLogStartOffset` field
- The new `AlterPartition` request version is backward compatible; older brokers will not send the `LocalLogStartOffset` field
- Controllers will handle missing `LocalLogStartOffset` by treating those replicas as having complete local logs (conservative default)
- The feature is opt-in via configuration, preserving existing behavior by default
Downgrade/Rollback Path
- Disable `leader.election.prefer.early.local.log.start.offset` cluster-wide
- Perform rolling downgrade
- The controller will stop using local log completeness in election decisions
Test Plan
- Unit tests for PartitionChangeBuilder to verify leader election prefers replicas with lower local-log-start-offset
- Unit tests for `FetchRequest` serialization of `LocalLogStartOffset` field
- Unit tests for AlterPartition request/response serialization of LocalLogStartOffset field
- Integration tests with tiered storage to verify new brokers are deprioritized for leader election
- Integration tests to verify leader election succeeds when only limited-local-log brokers are available
Rejected Alternatives
- Strict exclusion: Completely exclude tiered-storage followers from leader election until they have sufficient local data. Rejected because this could cause unavailability if all ISR members are tiered-storage followers.
- Time-based exclusion: Exclude followers for a configurable time period after joining ISR. Rejected because time doesn't reflect actual data replication progress.
- Percentage threshold: Only elect replicas with local log covering X% of total log range. Rejected because it adds complexity; simple offset comparison achieves the same goal.