Status

Current state: Under Discussion

Discussion thread: here [https://lists.apache.org/thread/qrphsj5vtzqlbd1qhg897fp723m20xm1]

JIRA: here [ KAFKA-19048 - Getting issue details... STATUS ]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka clusters typically require rebalancing of topic replicas after horizontal scaling to evenly distribute the load across new and existing brokers. The current rebalancing approach does not consider the existing replica distribution, often resulting in excessive and unnecessary replica movements. These unnecessary movements increase rebalance duration, consume significant bandwidth and CPU resources, and potentially disrupt ongoing production and consumption operations. Thus, a replica rebalancing strategy that minimizes movements while achieving an even distribution of replicas is necessary.

Public Interfaces

  1. kafka-reassign-partitions.sh script add new "–rebalance" command is added.

Goals

The new replica rebalancing strategy aims to achieve the following objectives:

  1. Minimal Movement: Minimize the number of replica relocations during rebalancing.
  2. Replica Balancing: Ensure that replicas are evenly distributed across brokers.
  3. Anti-Affinity Support: Support rack-aware allocation when enabled.
  4. Leader Balancing: Distribute leader replicas evenly across brokers.
  5. ISR Order Optimization: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures.
  6. Leader Stability: Where possible, retain existing leader assignments to reduce leadership churn, provided this does not compromise the first five objectives.

Proposed Changes

Rack-Level Replica Distribution

The following rules ensure balanced replica allocation at the rack level:

  1. When rackCount = replicationFactor:
    • Each rack receives exactly partitionCount replicas.
  2. When rackCount > replicationFactor:
    • If weighted allocation (rackBrokers/totalBrokers × totalReplicas) ≥ partitionCount: each rack receives exactly partitionCount replicas.
    • If weighted allocation < partitionCount: distribute remaining replicas using a weighted remainder allocation.

Node-Level Replica Distribution

  1. If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others.
  2. When **rackCount = replicationFactor**:
    • If all racks have an equal number of nodes, each node will host an equal number of replicas.
    • If rack sizes vary, nodes in larger racks will host fewer replicas on average.
  3. When **rackCount > replicationFactor**:
    • If no rack has a significantly higher node weight, replicas will be evenly distributed.
    • If a rack has disproportionately high node weight, those nodes will receive fewer replicas.

Anti-Affinity Support

When anti-affinity is enabled, the rebalance algorithm ensures that replicas of the same partition do not colocate on the same rack. Brokers without rack configuration are excluded from anti-affinity checks.

Replica Balancing Algorithm

Through the above steps, we can calculate the ideal replica count for each node and rack.
Based on the initial replica distribution of topics, we obtain the current replica partition allocation across nodes and racks, allowing us to identify which nodes violate anti-affinity rules.

We iterate through nodes with the following priority:

  1. First process nodes that violate anti-affinity rules
  2. Then process nodes whose current replica count exceeds the desired replica count (prioritizing those with the largest discrepancy)

For these identified nodes, we relocate their replicas to target nodes that:

  • Satisfy all anti-affinity constraints

  • Have a current replica count below their ideal allocation

This process continues iteratively until:

  • No nodes violate anti-affinity rules

  • All nodes' current replica counts match their desired replica counts

Upon satisfying these conditions, we achieve balanced replica distribution across nodes.

Leader Balancing Algorithm

Target Leader Calculation:

Compute baseline average: leader_avg = total_partitions / total_nodes

Identify broker where replica_count ≤ leader_avg:

  • Designate all replicas as leaders on these brokers

  • Subtract allocated leaders: remaining_partitions -= assigned_leaders

  • Exclude nodes: remaining_brokers -= processed_brokers

Iteratively recalculate leader_avg until minimum replica nodes satisfy replica_count ≥ leader_avg

Leader Assignment Constraints:

Final targets:

  • Light brokers: target_leaders = replica_count

  • Normal brokers: target_leaders = leader_avg

For each partition, select the broker with the largest difference between its target_leaders and current leader count to become that partition's leader. Upon completing this traversal, we achieve uniform leader distribution across all brokers`.

Optimizing ISR Order

During Leader Rebalancing, the leader of each partition is fixed and does not change.

Tracking Node Pair Frequencies:

  • Iterate through all partitions and record the first replica (which is the leader).

  • Track the occurrences of broker pairs (broker pairs) formed by the first and second replicas of each partition.

Optimized Selection of Subsequent Replicas:

  • For each partition, when selecting the second replica, choose a broker that forms the least frequent node pair with the first replica.

  • Continue this process iteratively for all replicas in the partition.


Compatibility, Deprecation, and Migration Plan

  • This will modify the replica generation strategy in the kafka-reassign-partitions.sh script.

  • New "–rebalance" command (invoking new rebalance algorithm) is added.

Test Plan

We can use the kafka-reassign-partitions.sh script to test the overall functionality, but whether there are issues in the implementation of this algorithm can be determined through unit testing.

Use this class to unit test MinimalMovementReplicaBalancerTest.java

Other Alternatives

KAFKA-2106, KAFKA-1792, and KIP-6 describe other alternatives.




  • No labels