Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Clarify that StopReplica requests are sent and fix some inconsistencies

...

LeaderAndIsrRequest

We will add a two new field fields to LeaderAndIsrRequest named TargetReplicas.  This field will AddingReplicas and RemovingReplicas. These fields will, respectively, contain the replicas which will exist once the partition reassignment is completeare being added and removed from a partition's assignment.

These fields allow the controller to propagate reassignment information to the leader/follower brokers, paving the way for some of the improvements outlined in the Future Work section of this KIP. We will not make use of these fields as of this KIP.

Code Block
languagejs
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index b8988351c..d45727078 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -20,6 +20,8 @@
   // Version 1 adds IsNew.
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
+  //
+  // Version 3 adds AddingReplicas and RemovingReplicas.
   "validVersions": "0-3",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+",
@@ -48,6 +50,8 @@
           "about": "The ZooKeeper version." },
         { "name": "Replicas", "type": "[]int32", "versions": "0+",
           "about": "The replica IDs." },
+        { "name": "AddingReplicas", "type": "[]int32", "versions": "3+",
+          "about": "The replica IDs that we are adding this partition to." },
+        { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+",
+          "about": "The replica IDs that we are removing this partition from." },
         { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, 
           "about": "Whether the replica should have existed on the broker or not." }
       ]}

...

Code Block
languagejs
titletopics/[topic] zNode content
{
  "version": 12,
  "partitions": {"0": [1, 4, 2, 3] },
  "addingReplicas": {"0": [4] } # <----- NEW
  "removingReplicas": {"0": [1] }, # <---- NEW
}

...

The controller will handle the AlterPartitionAssignmentsResponse RPC by modifying the /brokers/topics/[topic] znode.  Once the node is modified, the controller will send out LeaderAndIsrRequest to start all the movements.

Once the controller is notified that a new replica has entered the ISR for a particular partition, it will remove it from the "addingReplicas" field in `/topics/[topic]`. If "addingReplicas" becomes empty, the controller will send a new wave of LeaderAndIsr requests to retire all the old replicas and, if applicable, elect new leaders.

Algorithm

Algorithm

We will We will always add all the replicas in the "addingReplicas" field before starting to act on the "removingReplicas" field.

Code Block
# Illustrating the rebalance of a single partition.
# R is the current replicas, I is the ISR list, AR is addingReplicas and RR is removingReplicas
R: [1, 2, 3], I: [1, 2, 3], AR: [], RR: []
# Reassignment called via API with targetReplicas=[4,3,2]

R: [1, 4, 3, 2], I: [1, 2, 3], AR: [4], RR: [1] # Controller sends LeaderAndIsr requests 
# (We ignore RR until AR is empty)
R: [1, 4, 3, 2], I: [1, 2, 3, 4], AR: [4], RR: [1] # Leader 1 adds 4 to ISR
# The controller realizes that all the replicas in AR have been added and starts work on RR. Removes all of RR from R and from the ISR, and sends StopReplica/LeaderAndIsr requests
R: [4, 3, 2], I: [2, 3, 4], AR: [], RR: [] # at this point the reassignment is considered complete


When all of the replicas in AR have been added, we remove RR from R and empty out AR/RR in one step. 

If a new reassignment is issued during an on-going one, we cancel the current one by emptying out both AR and RR, constructing them , send StopReplica requests to the replicas that were part of the old reassignment but not part of the new one, construct AR/RR from (the updated from the last-reassignment) R and TR, and starting start anew.

In general this algorithm is consistent with the current Kafka behavior - other brokers still get the full replica set consisting of both the original replicas and the new ones.

...

Essentially, once a cancellation is called we subtract AR from R, empty out both AR and RR, and send StopReplica/LeaderAndIsr requests to cancel the replica movements that have not yet completed.

...