Status
Current state: Under Discussion
Discussion thread: here [https://lists.apache.org/thread/qrphsj5vtzqlbd1qhg897fp723m20xm1]
JIRA: here [
-
KAFKA-19048Getting issue details...
STATUS
]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka clusters typically require rebalancing of topic replicas after horizontal scaling to evenly distribute the load across new and existing brokers. The current rebalancing approach does not consider the existing replica distribution, often resulting in excessive and unnecessary replica movements. These unnecessary movements increase rebalance duration, consume significant bandwidth and CPU resources, and potentially disrupt ongoing production and consumption operations. Thus, a replica rebalancing strategy that minimizes movements while achieving an even distribution of replicas is necessary.
Public Interfaces
kafka-reassign-partitions.sh
script add new "–rebalance" command is added.
Goals
The new replica rebalancing strategy aims to achieve the following objectives:
- Minimal Movement: Minimize the number of replica relocations during rebalancing.
- Replica Balancing: Ensure that replicas are evenly distributed across brokers.
- Anti-Affinity Support: Support rack-aware allocation when enabled.
- Leader Balancing: Distribute leader replicas evenly across brokers.
- ISR Order Optimization: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures.
- Leader Stability: Where possible, retain existing leader assignments to reduce leadership churn, provided this does not compromise the first five objectives.
Proposed Changes
Rack-Level Replica Distribution
The following rules ensure balanced replica allocation at the rack level:
- When
rackCount = replicationFactor
:- Each rack receives exactly
partitionCount
replicas.
- Each rack receives exactly
- When
rackCount > replicationFactor
:- If weighted allocation
(rackBrokers/totalBrokers × totalReplicas) ≥ partitionCount
: each rack receives exactlypartitionCount
replicas. - If weighted allocation
< partitionCount
: distribute remaining replicas using a weighted remainder allocation.
- If weighted allocation
Node-Level Replica Distribution
- If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others.
- When
**rackCount = replicationFactor**
:- If all racks have an equal number of nodes, each node will host an equal number of replicas.
- If rack sizes vary, nodes in larger racks will host fewer replicas on average.
- When
**rackCount > replicationFactor**
:- If no rack has a significantly higher node weight, replicas will be evenly distributed.
- If a rack has disproportionately high node weight, those nodes will receive fewer replicas.
Anti-Affinity Support
When anti-affinity is enabled, the rebalance algorithm ensures that replicas of the same partition do not colocate on the same rack. Brokers without rack configuration are excluded from anti-affinity checks.
Replica Balancing Algorithm
Through the above steps, we can calculate the ideal replica count for each node and rack.
Based on the initial replica distribution of topics, we obtain the current replica partition allocation across nodes and racks, allowing us to identify which nodes violate anti-affinity rules.
We iterate through nodes with the following priority:
- First process nodes that violate anti-affinity rules
- Then process nodes whose current replica count exceeds the desired replica count (prioritizing those with the largest discrepancy)
For these identified nodes, we relocate their replicas to target nodes that:
Satisfy all anti-affinity constraints
Have a current replica count below their ideal allocation
This process continues iteratively until:
No nodes violate anti-affinity rules
All nodes' current replica counts match their desired replica counts
Upon satisfying these conditions, we achieve balanced replica distribution across nodes.
Leader Balancing Algorithm
Target Leader Calculation:
Compute baseline average: leader_avg = total_partitions / total_nodes
Identify broker where replica_count ≤ leader_avg
:
Designate all replicas as leaders on these brokers
Subtract allocated leaders:
remaining_partitions -= assigned_leaders
Exclude nodes:
remaining_brokers -= processed_brokers
Iteratively recalculate leader_avg
until minimum replica nodes satisfy replica_count ≥ leader_avg
Leader Assignment Constraints:
Final targets:
Light
brokers
:target_leaders = replica_count
Normal
broker
s:target_leaders = leader_avg
For each partition, select the broker
with the largest difference between its target_leaders
and current leader count to become that partition's leader. Upon completing this traversal, we achieve uniform leader distribution across all brokers`.
Optimizing ISR Order
During Leader Rebalancing, the leader of each partition is fixed and does not change.
Tracking Node Pair Frequencies:
Iterate through all partitions and record the first replica (which is the leader).
Track the occurrences of broker pairs (broker pairs) formed by the first and second replicas of each partition.
Optimized Selection of Subsequent Replicas:
For each partition, when selecting the second replica, choose a broker that forms the least frequent node pair with the first replica.
Continue this process iteratively for all replicas in the partition.
Compatibility, Deprecation, and Migration Plan
This will modify the replica generation strategy in the
kafka-reassign-partitions.sh
script.- New "–rebalance" command (invoking new rebalance algorithm) is added.
Test Plan
We can use the kafka-reassign-partitions.sh
script to test the overall functionality, but whether there are issues in the implementation of this algorithm can be determined through unit testing.
Use this class to unit test MinimalMovementReplicaBalancerTest.java
Other Alternatives
KAFKA-2106, KAFKA-1792, and KIP-6 describe other alternatives.