Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Reassignments especially for large topic/partition is costly.  In some case, the performance of the Kafka cluster can be severely impacted when reassignments are kicked off.   There should be a fast, clean, safe way to cancel and rollback the pending reassignments.   e.g.  original replicas [1,2,3],  new replicas [4,5,6],   causing performance impact on Leader 1,  the reassignment should be able to get cancelled immediately and reverted back to original replicas [1,2,3],  and dropping the new replicas. 
  2. Each batch of reassignments takes as long as the slowest partition; this slowest partition prevents other reassignments from happening.   This can be happening even in the case submitting the reassignments by grouping similar size topic/partitions into each batch. How to optimally group reassignments into one batch for faster execution and less impact to the cluster is beyond the discussion in this KIP.  The ZooKeeper-imposed limit of 1MB on znode size places an upper limit on the number of reassignments that can be done at a given time. Note that practically in real production environment, it's better to do reassignments in batches with reasonable number of reassignments in each batch.  Large number of reassignments tends to cause higher Producer latency.  Between batches,  proper staggering, throttling is recommended.   This is addressed in the Planned Future Changes section and may be implemented in another KIP. 
  3. Currently, the reassignment operations are still communicated directly with the Zookeeper.   Other admin types of operation like create/delete topics, etc. are moving to the RPC based KIP-4 wire protocol.   By moving from interacting directly with Zookeeper to  RPC,  it offers the user the recommended path and discourages directly modifying the Zookeeper nodes.  This will pave the way to lock down Zookeeper security by ACLs,  that only brokers need to communicate with ZK.


This change would enable 

  • Cancel all pending reassignments currently in /admin/reassign_partitions and revert them back to their original replicas.

  • Adding more partition reassignments, while some are still in-flight.  Even though in the original design of the reassign tool, the intent was for the znode (/admin/reassign_partitions) not to be updated by the tool unless it was empty,  there are user requests to support such feature,  e.g. KAFKA-7854  This is listed in "Planned Future Changes" Section and maybe be implemented in another KIP. 
  • Development of an AdminClient API which supported the above features.

Public Interfaces

Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions znode directly,  this could break in future versions of Kafka  (e.g. as reported in KAFKA-7854),  and such operations should be discouraged.  

  • Development of an AdminClient API which supported the above features.  Change the current administrative APIs to go through  RPC instead of Zookeeper. 

Public Interfaces

Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions znode directly,  this could break in future versions of Kafka  (e.g. as reported in KAFKA-7854),  and such operations should be discouraged.  


A new znode /admin/cancel_reassignment_in_progress A new znode /admin/cancel_reassignment_in_progress is used to signal the Controller to cancel  current pending reassignments  in  /admin/reassign_partitions, Note that we can only cancel the pending reassignments of current batch of reassignments,  some reassignments can complete almost instantly if the replicas set is not changed (already in ISR), only the ordering is changed.  e.g.   (1,2,3) => (2,3,1), the preferred leadership is changed.  To rollback all the reassignments in current batch (not just the pending reassignments, including those already completed in the same batch),  the client who submitted the reassignment should keep a "rollback" version and submit as reassignment after /admin/reassign_partitions is empty and deleted

...

No Format
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/
[u'reassign_partitions',
 u'delete_topics']

# Current pending reassignment(s)
$ zkcli -h kafka-zk-host1 get /kafka-cluster/admin/reassign_partitions
('{"version":1,"partitions":[{"topic":"test_topic","partition":25,"replicas":[1,2,4],"original_replicas":[1,2,3]}]}', ZnodeStat(czxid=17180484637, mzxid=17180484641, ctime=1549498790668, mtime=1549498790680, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=148, numChildren=0, pzxid=17180484637))

$ /usr/lib/kafka/bin# Cancel the pending reassignments.  and remove the throttle as well. 
$ /usr/lib/kafka/bin/kafka-reassign-partitions.sh  --zookeeper kafka-zk-host1/kafka-cluster --cancel
Rolling back the current pending reassignments Map(test_topic-25 -> Map(replicas -> Buffer(1, 2, 4), original_replicas -> Buffer(1, 2, 3)))
Successfully submitted cancellation of reassignments.

#The Thiscancelled ispending justreassignments forthrottle illustrationwas purposeremoved.
Please run --verify Into reality,have the cancellation ofprevious reassignments should(not bejust pretty quickthe cancelled reassignments in progress) throttle removed.

# This is just for illustration purpose.  In reality, the cancellation of reassignments should be pretty quick. 
# The below listing of /admin might not even show cancel_reassignment_in_progress & reassign_partitions
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/
[u'cancel_reassignment_in_progress',
 u'reassign_partitions',
 u'delete_topics']

# After reassignment cancellation is complete.  The ZK node  /admin/cancel_reassignment_in_progress  & /admin/reassign_partitions are gone.
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/
[u'delete_topics']

Planned Future Changes 

New reassignments while existing reassignments in-flight  

The above Reassignment Cancellation is more straight forward.   However,  to submit new reassignments while there are existing reassignments are still in-flight, it needs a bit more discussions and consensus.  It might be worth doing it in another KIP.  So it's listed as  Planned Future Changes,  if consensus can be reached on this design,  this feature can be delivered in this KIP as well.  

In order to support submitting more reassignments while existing reassignments are still in-flight.  An extra znode /admin/reassign_partitions_queue  which has the same JSON format as /admin/reassign_partitions.   Three more options --generate-queue  --verify-queue  --execute-queue  will be added to  kafka-reassign-partitions.sh. The controller will be informed of the queued reassignments via a ZooKeeper watch.   It will get all topic/partitions from /admin/reassign_partitions_queue  and  add to /admin/reassign_partitions,  then trigger the reassignments onPartitionReassignment()  of the  topic/partitions.   

The new /admin/reassign_partitions_queue znode JSON format is the same as /admin/reassign_partitions . e.g. :

Code Block
languagejs
{"version":1,
 "partitions":[{"topic": "foo1",
                "partition": 0,
		        "replicas": [1,2,5]
               },
               {"topic": "foo2",
                "partition": 1,
		        "replicas": [7,9,10]
               }]            
}

...

In case inside the /admin/reassign_partitions_queue,  there are topic/partitions which exist in /admin/reassign_partitions (pending reassignments),  the conflict resolution for those duplicate  topic/partitions is to first  cancel / rollback the pending reassignments of those topic/partitions in /admin/reassign_partitions,  then submit new reassignments from /admin/reassign_partitions_queue to /admin/reassign_partitions.   This approach will be simpler than the algorithm proposed by Tom Bentley previously to infer the final replicas assignments for those duplicate  topic/partitions.   After the topic/partition is put in /admin/reassign_partitions ControllerContext.partitionBeingReassigned to trigger the reassignment,    the topic/partition will be removed from /admin/reassign_partitions_queue,  and when /admin/reassign_partitions_queue is empty,  the znode will be deleted. 

Instead a protocol similar to that used for config changes will be used. To start a reassignment (of a single partition, or a collection of partitions) a persistent sequenced znode will be created in /admin/reassignment_requests. The controller will be informed of this new request via a ZooKeeper watch. It will initiate the reassignment, create a znode /admin/reassignments/$topic/$partition and then delete the /admin/reassignment_requests/request_NNNN znode.

A client can subsequently create more requests involving the same partitions (by creating a /admin/reassignment_requests/request_MMMM znode), and the controller would re-initiate reassignment as the assignment changes. This means it is possible to "cancel" the reassignment of a single partition by changing the reassignment to the old assigned replicas. Note, however, that the controller itself doesn't remember the old assigned replicas – it is up to the client to record the old state prior to starting a reassignment if that client needs to support cancellation.

When the ISR includes all the newly assigned replicas (given as the contents of the reassignment znode), the controller would remove the relevant child of /admin/reassignments (i.e. at the same time the controller currently updates/removes the /admin/reassign_partitions znode).

In this way the child nodes of /admin/reassignments are correspond precisely to the replicas currently being reassigned. Moreover the ZooKeeper czxid of a child node of /admin/reassignments identifies a reassignment uniquely. It is then possible to determine whether that exact reassignment is still on-going, or (by comparing the czxid with the mzxid) has been changed.

The following sections describe, in detail, the algorithms used to mange the various znodes

The supporting the legacy znode protocol (/admin/reassign_partitions)

  1. Some ZK client creates /admin/reassign_partitions with the same JSON content as currently
  2. ZK triggers watch on /admin/reassign_partitions, controller notified
  3. Controller creates /admin/reassignment_requests/request_xxx (for xxx the znode sequence number)

The JSON in step 3 would look like:

No Format
{ "version":1, "assignments":{ "my-topic": {"0":[1,2,3]} "your-topic": {"3":[4,5,6]} } "legacy":true }

The "legacy" key is used to track that this request originates from /admin/reassign_partitions.

The new znode protocol

  1. Some ZK client creates /admin/reassignment_requests/request_xxx (without "legacy" key mentioned above, unless it was created via the legacy protocol described above)
  2. ZK triggers watch on /admin/reassignment_requests/request_xxx, controller notified

  3. For each /admin/reassignment_requests/request_xxx the controller:

    1. Reads the znode to find the partitions being reassigned and the newly assigned brokers.

    2. Checks whether a reassignment for the same partition exists:
      1. If it does, and one is a legacy request and the other is not, then it logs a warning that the request is being ignored.
      2. Otherwise:

        1. The controller initiates reassignment

        2. The controller creates or updates  /admin/reassignments/$topic/$partition corresponding to those partitions
    3. The controller deletes /admin/reassignment_requests/request_xxx

The check in 3.b.i prevents the same partition being reassigned via different routes.

Revised protocol when a partition "catches up

  1. If the reassignment was via the legacy path:
    1. The controller removes partition from /admin/reassign_partitions, or if this is the last partition being reassigned by a change to /admin/reassign_partitions, then /admin/reassign_partitions is deleted.
  2. The controller removes /admin/reassignments/$topic/$partition

On controller failover

When failing over, the new controller needs to cope with three cases:

  1. Creation of /admin/reassign_partitions while there was no active controller
  2. Creation of childen of /admin/reassignment_requests while there was no active controller
  3. Restarting reassignment of all the in-flight reassignments in /admin/reassignments

The controller context's partitionsBeingReassigned map can be updated as follows:

  1. Add a ReassignedPartitionsContext for each reassignment in /admin/reassignments
  2. Add ReassignedPartitionsContexts for each child of /admin/reassignment_requests, but only if the context being added has the same legacy flag as any current context for the partition.
  3. Add ReassignedPartitionsContexts for partition in /admin/reassign_partitions, but only if the context being added has the same legacy flag as any current context for the partition.

Change to the KafkaController.onPartitionReassignment() algorithm

The existing algorithm in onPartitionReassignment() is suboptimal for changing reassignments in-flight. As an example, consider an initial assignment of [1,2], reassigned to [2,3] and then changed to [2,4]. With the current algorithm broker 3 will remain in the assigned replicas until broker 4 is in sync, even though 3 wasn't one of the original assigned replicas nor one of the new assigned replicas after the second reassignment. Broker 3 will try to get and stay in-sync during this time, wasting network bandwidth and CPU on both broker 3 and the leader.

More complex scenarios exist where such "intermediate replicas" become the leader.

To solve this problem it is necessary to compute a set of replicas to be removed from the assigned replicas when a reassignment changes.

The set drop of replicas to be removed will be computed as follows:

  1. At the point a new reassignment is created we compute the value top = len(original_assignment)

    1. We check top >= min.insync.replicas, logging a warning and ignore the reassignment if the check fails.

    2. Otherwise we store this in the /admin/reassignments/$topic/$partition znode.
  2. Then, initially, and each time the reassignment changes, (i.e. in KafkaController.onPartitionReassignment()):
    1. Let assigned be the currently assigned replicas, ie. assigned = ControllerContext.partitionRepliacAssignment(tp)
    2. Define a sorted list of out-of-sync replicas: osr = assigned -- isr
    3. Create a list of current replicas existing = List(leader) ++ isr ++ osr
    4. Remove duplicates from existing (keeping the first occurrence in the list)

    5. Then, the set of replicas which can be dropped is: drop = assigned -- existing[0:top-1]

Notes:

  • Since top > 0 it is impossible to drop the leader, and thus it's impossible that a leader election will be needed at this point.
  • By the assumption top >= min.insync.replicas it's impossible to shrink the ISR to a point where producers are blocked, assuming the ISR was already sufficifient.

  • If the controller knew how far behind the ISR each of the OSR replicas was, OSR could be sorted, which would result in dropping the furthest-behind replicas. But the controller doesn't know this.

 In order to actually drop those replicas:

...

Make the transitions in the RSM: -> Offline -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful -> NonExistentReplica


If the pending reassignments have throttle,  the throttle will be removed after the reassignments are cancelled.   However for the reassignments already completed,  the user would need to remove their throttle by running the kafka-reassign-partitions.sh --verify

Skip Reassignment Cancellation Scenarios

There are a couple scenarios that the Pending reassignments in /admin/reassign_partitions can not be cancelled / rollback.   

  1. If the "original_replicas"  is missing for the topic/partition in /admin/reassign_partitions .  In this case, the pending reassignment cancelled will be skipped.  Because there is no way to reset to the original replicas.  The reasons this can happened  could be: 
    1. if either the user/client is tampering /admin/reassign_partitions directly, and does not have the "original_replicas" for the topic
    2. if the user/client is using incorrect versions of the admin client to submit for reassignments.   The Kafka software should be upgraded not just for all the brokers in the cluster.  but also on the host that is used to submit reassignments. 

  2. If all the "original_replicas" brokers are not in ISR,  and some brokers in the "new_replicas" are not offline for the topic/partition in the pending reassignments.   In this case, it's better to skip this topic's pending reassignment  cancellation/rollback,  otherwise, it will become offline.  However,  if all the brokers in "original_replicas" are offline  AND  all the brokers in "new_replicas" are also offline for this topic/partition,  then the cluster is in such a bad state, the topic/partition is currently offline anyway,  it will cancel/rollback this topic pending reassignments back to the "original_replicas".  

Planned Future Changes 

New reassignments while existing reassignments in-flight  

The above Reassignment Cancellation is more straight forward.   However,  to submit new reassignments while there are existing reassignments are still in-flight, it needs a bit more discussions and consensus.  It might be worth doing it in another KIP.  So it's listed as  Planned Future Changes,  if consensus can be reached on this design,  this feature can be delivered in this KIP as well.  

In order to support submitting more reassignments while existing reassignments are still in-flight.  An extra znode /admin/reassign_partitions_queue  which has the same JSON format as /admin/reassign_partitions.   Three more options --generate-queue  --verify-queue  --execute-queue  will be added to  kafka-reassign-partitions.sh. The controller will be informed of the queued reassignments via a ZooKeeper watch.   It will get all topic/partitions from /admin/reassign_partitions_queue  and  add to /admin/reassign_partitions,  then trigger the reassignments onPartitionReassignment()  of the  topic/partitions.   

The new /admin/reassign_partitions_queue znode JSON format is the same as /admin/reassign_partitions . e.g. :

Code Block
languagejs
{"version":1,
 "partitions":[{"topic": "foo1",
                "partition": 0,
		        "replicas": [1,2,5]
               },
               {"topic": "foo2",
                "partition": 1,
		        "replicas": [7,9,10]
               }]            
}


If  /admin/reassign_partitions_queue znode already exists,  new queued reassignments will be blocked from writing to /admin/reassign_partitions_queue

In case inside the /admin/reassign_partitions_queue,  there are topic/partitions which exist in /admin/reassign_partitions (pending reassignments),  the conflict resolution for those duplicate  topic/partitions is to first  cancel / rollback the pending reassignments of those topic/partitions in /admin/reassign_partitions,  then submit new reassignments from /admin/reassign_partitions_queue to /admin/reassign_partitions.   This approach will be simpler than the algorithm proposed by Tom Bentley previously to infer the final replicas assignments for those duplicate  topic/partitions.   After the topic/partition is put in /admin/reassign_partitions ControllerContext.partitionBeingReassigned to trigger the reassignment,    the topic/partition will be removed from /admin/reassign_partitions_queue,  and when /admin/reassign_partitions_queue is empty,  the znode will be deleted. 

...


Compatibility, Deprecation, and Migration Plan

...