Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-6359
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The current kafka-reassign-partitons.sh
tool imposes the limitation that only a single batch of partition reassignments can be in-flight, and it is not possible to cancel a reassignment that is in-flight. This has a number of consequences:
This change would enable
To illustrate the last bullet, consider an AdminClient API for partition reassignment that returns a KafkaFuture
providing access to some ReassignmentPartitionsResult
which (implicitly) includes the identity of each partitions Reassignment
. Further AdminClient APIs could then be added to:
Reassignment
Reassignments
Reassignment
Reassignment
These things are hard to do currently because individual partition reassignments don't have a stable identity.
Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions
znode directly I felt it was worthwhile using the KIP process for this change.
The main idea is to move away from using the single /admin/reassign_partitions
znode to control reassignment.
Instead a protocol similar to that used for config changes will be used. To start a reassignment (of a single partition, or a collection of partitions) a persistent sequenced znode will be created in /admin/reassignment_requests
. The controller will be informed of this new request via a ZooKeeper watch. It will initiate the reassignment, create a znode /admin/reassignments/$topic/$partition
and then delete the /admin/reassignment_requests/request_NNNN
znode.
A client can subsequently create more requests involving the same partitions (by creating a /admin/reassignment_requests/request_MMMM
znode), and the controller would re-initiate reassignment as the assignment changes. This means it is possible to "cancel" the reassignment of a single partition by changing the reassignment to the old assigned replicas. Note, however, that the controller itself doesn't remember the old assigned replicas – it is up to the client to record the old state prior to starting a reassignment if that client needs to support cancellation.
When the ISR includes all the newly assigned replicas (given as the contents of the reassignment znode), the controller would remove the relevant child of /admin/reassignments
(i.e. at the same time the controller currently updates/removes the /admin/reassign_partitions
znode).
In this way the child nodes of /admin/reassignments
are correspond precisely to the replicas currently being reassigned. Moreover the ZooKeeper czxid of a child node of /admin/reassignments
identifies a reassignment uniquely. It is then possible to determine whether that exact reassignment is still on-going, or (by comparing the czxid with the mzxid) has been changed.
The following sections describe, in detail, the algorithms used to mange the various znodes
/admin/reassign_partitions
)/admin/reassign_partitions
with the same JSON content as currently/admin/reassign_partitions
, controller notified/admin/reassignment_requests/request_xxx
(for xxx the znode sequence number)The JSON in step 3 would look like:
{ "version":1, "assignments":{ "my-topic": {"0":[1,2,3]} "your-topic": {"3":[4,5,6]} } "legacy":true } |
The "legacy" key is used to track that this request originates from /admin/reassign_partitions
.
/admin/reassignment_requests/request_xxx
(without "legacy" key mentioned above, unless it was created via the legacy protocol described above)
ZK triggers watch on /admin/reassignment_requests/request_xxx
, controller notified
For each /admin/reassignment_requests/request_xxx
the controller:
Reads the znode to find the partitions being reassigned and the newly assigned brokers.
Otherwise:
The controller initiates reassignment
/admin/reassignments/$topic/$partition
corresponding to those partitionsThe controller deletes /admin/reassignment_requests/request_xxx
The check in 3.b.i prevents the same partition being reassigned via different routes.
/admin/reassign_partitions
, or if this is the last partition being reassigned by a change to /admin/reassign_partitions
, then /admin/reassign_partitions
is deleted./admin/reassignments/$topic/$partition
When failing over, the new controller needs to cope with three cases:
/admin/reassign_partitions
while there was no active controller/admin/reassignment_requests
while there was no active controller/admin/reassignments
The controller context's partitionsBeingReassigned map can be updated as follows:
/admin/reassignments
/admin/reassignment_requests
, but only if the context being added has the same legacy flag as any current context for the partition.
/admin/reassign_partitions
, but only if the context being added has the same legacy flag as any current context for the partition.KafkaController.onPartitionReassignment()
algorithmThe existing algorithm in onPartitionReassignment()
is suboptimal for changing reassignments in-flight. As an example, consider an initial assignment of [1,2], reassigned to [2,3] and then changed to [2,4]. With the current algorithm broker 3 will remain in the assigned replicas until broker 4 is in sync, even though 3 wasn't one of the original assigned replicas nor one of the new assigned replicas after the second reassignment. Broker 3 will try to get and stay in-sync during this time, wasting network bandwidth and CPU on both broker 3 and the leader.
More complex scenarios exist where such "intermediate replicas" become the leader.
To solve this problem it is necessary to compute a set of replicas to be removed from the assigned replicas when a reassignment changes.
The set drop of replicas to be removed will be computed as follows:
At the point a new reassignment is created we compute the value top = len(original_assignment)
and store this in ZK. We also check top >= min.insync.replicas
, logging a warning and ignore the reassignment if the check fails.
KafkaController.onPartitionReassignment()
):osr = assigned -- isr
existing = List(leader) ++ isr ++ osr
Remove duplicates from existing
(keeping the first occurrence in the list)
drop = assigned --
existing[0:top-1]
Notes:
top > 0
it is impossible to drop the leader, and thus it's impossible that a leader election will be needed at this point.By the assumption top >= min.insync.replicas
it's impossible to shrink the ISR to a point where producers are blocked, assuming the ISR was already sufficifient.
In order to actually drop those replicas:
Make the transitions in the RSM: -> Offline -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful -> NonExistentReplica
Update the assigned replicas in memory and in ZK
As described above, compatibility with /admin/reassign_partitions
is maintained, so existing software will continue working and the only difference to a client that operates on /admin/reassign_partitions
would observe would be a slight increase in latency due to the round trips needed to create the new znodes.
This compatibility behaviour could be dropped in some future version of Kafka, if that was desirable.
/admin/reassignments
without the /admin/reassignment_requests
was initially considered, but that required a ZK watch per reassignment, which would not scale well. This proposal requires only 1 more watch than the current version of the broker.