Authors: George Li, Tom Bentley
Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-6359
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The current kafka-reassign-partitons.sh
tool imposes the limitation that only a single batch of partition reassignments can be in-flight, and it is not possible to cancel a reassignment that is in-flight cleanly, safely in a timely fashion (e.g. as reported KAFKA-6304, the current way of reassignment cancellation requires a lot of manual steps). This has a number of consequences:
- Reassignments especially for large topic/partition is costly. In some case, the performance of the Kafka cluster can be severely impacted when reassignments are kicked off. There should be a fast, clean, safe way to cancel and rollback the pending reassignments. e.g. original replicas [1,2,3], new replicas [4,5,6], causing performance impact on Leader 1, the reassignment should be able to get cancelled immediately and reverted back to original replicas [1,2,3], and dropping the new replicas.
- Each batch of reassignments takes as long as the slowest partition; this slowest partition prevents other reassignments from happening. This can be happening even in the case submitting the reassignments by grouping similar size topic/partitions into each batch. How to optimally group reassignments into one batch for faster execution and less impact to the cluster is beyond the discussion in this KIP.
The ZooKeeper-imposed limit of 1MB on znode size places an upper limit on the number of reassignments that can be done at a given time.Note that practically in real production environment, it's better to do reassignments in batches with reasonable reassignments in each batch. Large number reassignments tends to cause higher Producer latency. Between batches, proper staggering, throttling is recommended.
This change would enable
Cancel all pending reassignments currently in
/admin/reassign_partitions
and revert them back to their original replicas.- Adding more partition reassignments, while some are still in-flight. Even though in the original design of the reassign tool, the intent was for the znode (/admin/reassign_partitions) not to be updated by the tool unless it was empty, there are user requests to support such feature, e.g. KAFKA-7854.
- Development of an AdminClient API which supported the above features.
Public Interfaces
Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions
znode directly, this could break in future versions of Kafka (e.g. as reported in KAFKA-7854).
A new znode /admin/cancel_reassignment_in_progress
is used to signal the Controller to cancel current pending reassignments in /admin/reassign_partitions
For the existing /admin/reassign_partitions
znode, adding "original_replicas"
to support rollback to its original state of the topic partition assigned replicas. How "original_replicas"
gets populated will be discussed in detail later.
{"version":1, "partitions":[{"topic": "foo1", "partition": 0, "replicas": [1,2,4], "original_replicas": [1,2,3] }, {"topic": "foo2", "partition": 1, "replicas": [7,9,8], "original_replicas": [5,6,8] }] }
For submitting new reassignments while some are still pending, the new /admin/reassign_partitions_queue
znode is added, The JSON format is the same as /admin/reassign_partitions
(without "original_replicas"
added above). e.g. :
{"version":1, "partitions":[{"topic": "foo1", "partition": 0, "replicas": [1,2,5] }, {"topic": "foo2", "partition": 1, "replicas": [7,9,10] }] }
Proposed Changes
Reassignment Cancellation
The main idea is support clean, safe cancellation of pending reassignments in /admin/reassign_partitions
znode in a timely fashion, and support more reassignments while currently some reassignments are in-flight.
When client are submitting reassignments, it only needs to submit "replicas" (new replicas assignment) of the topic / partition. Before writing to /admin/reassign_partitions
, the current assigned replicas (original replicas) are read from Zookeeper and added the "original_replicas"
for that topic/partition reassignments . This "original_replicas"
will be used for rollback of the topic/partition replicas assignment during cancellation.
To trigger the reassignment cancellation, a new znode /admin/cancel_reassignment_in_progress
is created, the controller will be informed of the reassignment cancellation via a ZooKeeper watch on this. The controller will read the current pending reassignments in /admin/reassign_partitions
and re-populate ControllerContext.partitionsBeingReassigned
. For each pending topic/partition reassignments, the cancellation /rollback works like below, it's like the opposite of doing reassignments, since we have the "original_replicas"
of each topic/partition reassignments in /admin/reassign_partitions
& ControllerContext.partitionBeingReassigned
, it is much easier to rollback.
RAR = Reassigned replicas
OAR = Original list of replicas for partition
AR = current assigned replicas
1. Set AR to OAR in memory.
2. If the leader is not in OAR, elect a new leader from OAR. If new leader needs to be elected from OAR, a LeaderAndIsr
will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
In any case, the LeaderAndIsr request will have AR = OAR. This will prevent the leader from adding any replica in
OAR - RAR back in the isr.
3. Move all replicas in RAR - OAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
isr to remove RAR - OAR in zookeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
After that, we send a StopReplica (delete = false) to the replicas in RAR - OAR.
4. Move all replicas in RAR - OAR to NonExistentReplica state. This will send a StopReplica (delete = true) to
the replicas in RAR - OAR to physically delete the replicas on disk.
5. Update AR in ZK with OAR.
6. Update the /admin/reassign_partitions path in ZK to remove this partition.
7. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
The proposed new option: --cancel of
AdminClient CLI will be added to submit reassignment cancellation.
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/ [u'delete_topics'] $ /usr/lib/kafka/bin/kafka-reassign-partitions.sh --zookeeper kafka-zk-host1/kafka-cluster --cancel Rolling back the current pending reassignments Map(test_topic-25 -> Map(replicas -> Buffer(1, 2, 4), original_replicas -> Buffer(1, 2, 3))) Successfully submitted cancellation of reassignments. $ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/ [u'cancel_reassignment_in_progress', u'reassign_partitions', u'delete_topics'] # After reassignment cancellation is complete. The ZK node /admin/cancel_reassignment_in_progress & /admin/reassign_partitions are gone. $ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/ [u'delete_topics']
New reassignments while existing reassignments in-flight
In order to support submitting more reassignments while existing reassignments are still in-flight. An extra znode /admin/reassign_partitions_queue
which has the same JSON format as /admin/reassign_partitions
. Three more options --generate-queue --verify-queue --execute-queue
will be added to kafka-reassign-partitions.sh.
The controller will be informed of the queued reassignments via a ZooKeeper watch. It will get all topic/partitions from /admin/reassign_partitions_queue
and add to /admin/reassign_partitions
, then trigger the reassignments onPartitionReassignment()
of the topic/partitions.
In case inside the /admin/reassign_partitions_queue
, there are topic/partitions which exist in /admin/reassign_partitions
(pending reassignments), the conflict resolution for those duplicate topic/partitions is to first cancel / rollback the reassignments of those topic/partitions in
/admin/reassign_partitions
, then submit new reassignments from /admin/reassign_partitions_queue
to /admin/reassign_partitions
. This approach will be simpler than the algorithm proposed by Tom previously to infer the final replicas assignments for those duplicate topic/partitions. After the topic/partition is put in /admin/reassign_partitions
, it will be removed from /admin/reassign_partitions_queue
, and when /admin/reassign_partitions_queue
is empty, the znode will be deleted.
Instead a protocol similar to that used for config changes will be used. To start a reassignment (of a single partition, or a collection of partitions) a persistent sequenced znode will be created in /admin/reassignment_requests
. The controller will be informed of this new request via a ZooKeeper watch. It will initiate the reassignment, create a znode /admin/reassignments/$topic/$partition
and then delete the /admin/reassignment_requests/request_NNNN
znode.
A client can subsequently create more requests involving the same partitions (by creating a /admin/reassignment_requests/request_MMMM
znode), and the controller would re-initiate reassignment as the assignment changes. This means it is possible to "cancel" the reassignment of a single partition by changing the reassignment to the old assigned replicas. Note, however, that the controller itself doesn't remember the old assigned replicas – it is up to the client to record the old state prior to starting a reassignment if that client needs to support cancellation.
When the ISR includes all the newly assigned replicas (given as the contents of the reassignment znode), the controller would remove the relevant child of /admin/reassignments
(i.e. at the same time the controller currently updates/removes the /admin/reassign_partitions
znode).
In this way the child nodes of /admin/reassignments
are correspond precisely to the replicas currently being reassigned. Moreover the ZooKeeper czxid of a child node of /admin/reassignments
identifies a reassignment uniquely. It is then possible to determine whether that exact reassignment is still on-going, or (by comparing the czxid with the mzxid) has been changed.
The following sections describe, in detail, the algorithms used to mange the various znodes
The supporting the legacy znode protocol (/admin/reassign_partitions
)
/admin/reassign_partitions
)Some ZK client creates/admin/reassign_partitions
with the same JSON content as currentlyZK triggers watch on/admin/reassign_partitions
, controller notifiedController creates/admin/reassignment_requests/request_xxx
(for xxx the znode sequence number)
The JSON in step 3 would look like:
{ "version":1, "assignments":{ "my-topic": {"0":[1,2,3]} "your-topic": {"3":[4,5,6]} } "legacy":true }
The "legacy" key is used to track that this request originates from /admin/reassign_partitions
.
The new znode protocol
Some ZK client creates/admin/reassignment_requests/request_xxx
(without "legacy" key mentioned above, unless it was created via the legacy protocol described above)ZK triggers watch on/admin/reassignment_requests/request_xxx
, controller notifiedFor each/admin/reassignment_requests/request_xxx
the controller:Reads the znode to find the partitions being reassigned and the newly assigned brokers.Checks whether a reassignment for the same partition exists:If it does, and one is a legacy request and the other is not, then it logs a warning that the request is being ignored.Otherwise:The controller initiates reassignmentThe controller creates or updates/admin/reassignments/
corresponding to those partitions$topic/$partition
The controller deletes/admin/reassignment_requests/request_xxx
The check in 3.b.i prevents the same partition being reassigned via different routes.
Revised protocol when a partition "catches up
If the reassignment was via the legacy path:The controller removes partition from/admin/reassign_partitions
, or if this is the last partition being reassigned by a change to/admin/reassign_partitions
, then/admin/reassign_partitions
is deleted.
The controller removes/admin/reassignments/$topic/$partition
On controller failover
When failing over, the new controller needs to cope with three cases:
Creation of/admin/reassign_partitions
while there was no active controllerCreation of childen of/admin/reassignment_requests
while there was no active controllerRestarting reassignment of all the in-flight reassignments in/admin/reassignments
The controller context's partitionsBeingReassigned map can be updated as follows:
Add a ReassignedPartitionsContext for each reassignment in/admin/reassignments
Add ReassignedPartitionsContexts for each child of/admin/reassignment_requests
, but only if the context being added has the same legacy flag as any current context for the partition.Add ReassignedPartitionsContexts for partition in/admin/reassign_partitions
, but only if the context being added has the same legacy flag as any current context for the partition.
Change to the KafkaController.onPartitionReassignment()
algorithm
KafkaController.onPartitionReassignment()
algorithmThe existing algorithm in onPartitionReassignment()
is suboptimal for changing reassignments in-flight. As an example, consider an initial assignment of [1,2], reassigned to [2,3] and then changed to [2,4]. With the current algorithm broker 3 will remain in the assigned replicas until broker 4 is in sync, even though 3 wasn't one of the original assigned replicas nor one of the new assigned replicas after the second reassignment. Broker 3 will try to get and stay in-sync during this time, wasting network bandwidth and CPU on both broker 3 and the leader.
More complex scenarios exist where such "intermediate replicas" become the leader.
To solve this problem it is necessary to compute a set of replicas to be removed from the assigned replicas when a reassignment changes.
The set drop
of replicas to be removed will be computed as follows:
At the point a new reassignment is created we compute the valuetop = len(original_assignment)
We checktop >= min.insync.replicas
, logging a warning and ignore the reassignment if the check fails.Otherwise we store this in the/admin/reassignments/$topic/$partition
znode.
Then, initially, and each time the reassignment changes, (i.e. inKafkaController.onPartitionReassignment()
):Letassigned
be the currently assigned replicas, ie.assigned = ControllerContext.partitionRepliacAssignment(tp)
Define a sorted list of out-of-sync replicas:osr = assigned -- isr
Create a list of current replicasexisting = List(leader) ++ isr ++ osr
Remove duplicates fromexisting
(keeping the first occurrence in the list)Then, the set of replicas which can be dropped is:drop = assigned --
existing[0:top-1]
Notes:
Sincetop > 0
it is impossible to drop the leader, and thus it's impossible that a leader election will be needed at this point.By the assumptiontop >= min.insync.replicas
it's impossible to shrink the ISR to a point where producers are blocked, assuming the ISR was already sufficifient.If the controller knew how far behind the ISR each of the OSR replicas was, OSR could be sorted, which would result in dropping the furthest-behind replicas. But the controller doesn't know this.
In order to actually drop those replicas:
Make the transitions in the RSM:-> Offline -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful -> NonExistentReplica
Update the assigned replicas in memory and in ZK
Compatibility, Deprecation, and Migration Plan
As described above, compatibility with /admin/reassign_partitions
is maintained, so existing software will continue working and the only difference to a client that operates on /admin/reassign_partitions
would observe would be a slight increase in latency due to the round trips needed to create the new znode (/admin/reassign_partitions_queue
) and possible conflict resolution. The newly introduced znode /admin/cancel_reassignment_in_progress is used solely for canceling/rollback of current reassignments still inflight in /admin/reassign_partitions
.
This compatibility behavior could be dropped in some future version of Kafka, if that was desirable.
Rejected Alternatives
- A similar protocol based on just
/admin/reassignments
without the/admin/reassignment_requests
was initially considered, but that required a ZK watch per reassignment, which would not scale well. This proposal requires only 1 more watch than the current version of the broker.