Table of Contents |
---|
Authors: George Li, Tom Bentley
Status
Current state: Under Discussion
...
- Reassignments especially for large topic/partition is costly. In some case, the performance of the Kafka cluster can be severely impacted when reassignments are kicked off. There should be a fast, clean, safe way to cancel and rollback the pending reassignments. e.g. original replicas [1,2,3], new replicas [4,5,6], causing performance impact on Leader 1, the reassignment should be able to get cancelled immediately and reverted back to original replicas [1,2,3], and dropping the new replicas.
- Each batch of reassignments takes as long as the slowest partition; this slowest partition prevents other reassignments from happening. This can be happening even in the case submitting the reassignments by grouping similar size topic/partitions into each batch. How to optimally group reassignments into one batch for faster execution and less impact to the cluster is beyond the discussion in this KIP.
The ZooKeeper-imposed limit of 1MB on znode size places an upper limit on the number of reassignments that can be done at a given time.Note that practically in real Production production environment, it's better to do reassignments in batches with reasonable reassignments in each batch. Large number reassignments tends to cause higher Producer latency. Between batches, proper staggering, throttling is recommended.
...
Cancel all pending reassignments currently in
/admin/reassign_partitions
and revert them back to their original replicas.- Disable reassignments of the Kafka cluster when znode (e.g. Adding more partition reassignments, while some are still in-flight. Even though in the original design of the reassign tool, the intent was for the znode (/admin/reassign_cancel) is present. This is helpful for some production cluster (partitions) not to be updated by the tool unless it was empty, there are user requests to support such feature, e.g. the min.insync.replicas > 1) that are sensitive to reassignments and prevent accidentally starting reassignment on the Kafka cluster. Adding more partition reassignments, while some are still in-flight. Even though in the original design of the reassign tool, the intent was for the znode ( KAFKA-7854.
- Development of an AdminClient API which supported the above features.
Public Interfaces
Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions
...
znode directly, this could break in future versions of Kafka (e.g.
...
as reported in KAFKA-7854).
...
A new znode /admin/cancel_reassignment_in_progress
is used to signal the Controller to cancel current pending reassignments in /admin/reassign_partitions
For the existing
...
Public Interfaces
Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions
znode directly, this could break in future versions of Kafka (e.g. as reported in KAFKA-7854). For the existing /admin/reassign_partitions
znode, adding znode, adding "original_replicas"
to support rollback to its original state of the topic partition assigned replicas. How "original_replicas"
gets populated will be discussed in detail later.
...
For submitting new reassignments while some are still pending, the new /admin/reassign_partitions_extraqueue
znode is added, The JSON format is similar to /admin/reassign_partitions
...
To trigger the reassignment cancellation, a new znode /admin/reassign_cancel cancel_reassignment_in_progress
is created, the controller will be informed of the reassignment cancellation via a ZooKeeper watch on this. The controller will read the current pending reassignments in /admin/reassign_partitions and re-populate ControllerContext.partitionRepliacAssignmentpartitionReplicaAssignment
. For each pending topic/partition reassignments, the cancellation /rollback works like below, it's like the opposite of doing reassignments, since we have the "original_replicas"
of each topic/partition reassignments in /admin/reassign_partitions
& ControllerContext.partitionRepliacAssignmentpartitionReplicaAssignment
, it is much easier to rollback.
...
1. Set AR to OAR in memory.
2. If the leader is not in OAR, elect a new leader from OAR. If new leader needs to be elected from OAR, a LeaderAndIsr
will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
In any case, the LeaderAndIsr request will have AR = OAR. This will prevent the leader from adding any replica in
OAR - RAR back in the isr.
3. Move all replicas in RAR - OAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
isr to remove RAR - OAR in zookeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
After that, we send a StopReplica (delete = false) to the replicas in RAR - OAR.
4. Move all replicas in RAR - OAR to NonExistentReplica state. This will send a StopReplica (delete = true) to
the replicas in RAR - OAR to physically delete the replicas on disk.
5. Update AR in ZK with OAR.
6. Update the /admin/reassign_partitions path in ZK to remove this partition.
7. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
Note that /admin/reassign_cancel
will not be deleted after the cancellation is completed. AdminClient CLI options The proposed new option: --cancel of
AdminClient CLI will be added to submit reassignment cancellation, and remove the cancellation. e..g. The proposed 2 new options: --cancel
& --remove-cancel
No Format |
---|
kafka679-sjc1:~$$ zkcli -h kafkazk80kafka-zk-sjc1host1 ls /kafka-sjc1-secpoccluster/admin/ [u'delete_topics'] kafka679-sjc1:~$ /$ /usr/lib/kafka/bin/kafka-reassign-partitions.sh --zookeeper kafkazk80kafka-zk-sjc1host1/kafka-sjc1-secpoccluster --cancel Rolling back the current pending reassignments Map(test_topic-25 -> Map(replicas -> Buffer(8321, 6792, 6804), original_replicas -> Buffer(8321, 6792, 6813))) Successfully submitted cancellation of reassignments. kafka679-sjc1:~$$ zkcli -h kafkazk80kafka-zk-sjc1host1 ls /kafka-sjc1-secpoccluster/admin/ [u'cancel_reassignment_in_progress', u'reassign_cancelpartitions', u'delete_topics'] kafka679-sjc1:~ $ /usr/lib/kafka/bin/kafka-reassign-partitions.sh --zookeeper kafkazk80-sjc1/kafka-sjc1-secpoc --remove-cancel Successfully remove the znode /admin/reassign_cancel. kafka679-sjc1:~$ zkcli -h kafkazk80-sjc1 ls /kafka-sjc1-secpoc/admin/ [u'delete_topics'] |
While znode /admin/reassign_cancel
is present, no new reassignments can be submitted. e.g. enforce before writing to /admin/reassign_partitions
and at onPartitionReassignment()
with :
Code Block | ||
---|---|---|
| ||
private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
if (zkClient.reassignCancelInPlace()) return // if the ReassignCancelZNode exists , skip reassignment |
...
# After reassignment cancellation is complete. The ZK node /admin/cancel_reassignment_in_progress & /admin/reassign_partitions are gone.
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/
[u'delete_topics'] |
New reassignments while existing reassignments in-flight
In order to support submitting extra reassignments while existing reassignments are still in-flight. An extra znode /admin/reassign_partitions_extra queue
which has the same JSON format as /admin/reassign_partitions
. Three more options --generate-extra queue --verify-extra queue --execute-extraqueue
will be added to kafka-reassign-partitions.sh.
The controller will be informed of the extra reassignments via a ZooKeeper watch. It will get all topic/partitions from /admin/reassign_partitions_extra queue and add to /admin/reassign_partitions, then trigger the reassignments onPartitionReassignment()
of the topic/partitions.
In case inside the /admin/reassign_partitions_extraqueue
, there are topic/partitions which exist in /admin/reassign_partitions
pending reassignments, the conflict resolution for those duplicate topic/partitions is to first cancel / rollback the reassignments of those topic/partitions in
/admin/reassign_partitions
, then submit new reassignments from /admin/reassign_partitions_extraqueue
. This approach will be simpler than the algorithm proposed by Tom previously to infer the final replicas for those duplicate topic/partitions. After the topic/partition is put in /admin/reassign_partitions
, it will be removed from /admin/reassign_partitions_extraqueue
, and when /admin/reassign_partitions_extraqueue
is empty, the znode will be deleted.
...
As described above, compatibility with /admin/reassign_partitions
is maintained, so existing software will continue working and the only difference to a client that operates on /admin/reassign_partitions
would observe would be a slight increase in latency due to the round trips needed to create the new znode (/admin/reassign_partitions_extraqueue
) and possible conflict resolution. The newly introduced znode /admin/reassign_cancel cancel_reassignment_in_progress is used solely for canceling/rollback of current reassignments still inflight in /admin/reassign_partitions
.
...