You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: in review

Discussion thread: here

JIRA KAFKA-8345 - Getting issue details... STATUS

Release:

Motivation

Currently, users initiate replica reassignment by writing directly to a ZooKeeper node named /admin/reassign_partitions.

As explained in KIP-4, ZooKeeper based APIs have many problems.  For example, there is no way to return a helpful error code if an invalid reassignment is proposed.  Adding new features over time is difficult when external tools can access and overwrite Kafka's internal metadata data structures.  Also, ZooKeeper-based APIs inherently lack security and auditability.

In addition to all the general problems of ZK-based APIs, the current reassignment interface has some problems specific to its particular structure.  There is no mechanism provided for aborting a reassignment operation once it has been initiated.  Furthermore, the API assumes that reassignment operations are launched as a batch – there is no way to incrementally add a new reassignment operation once the batch has been initiated.

We would like to provide a well-supported AdminClient API that does not suffer from these problems.  This API can be used as a foundation on which to build future improvements.

Public Interfaces

AdminClient APIs

We will add two new APIs: alterPartitionAssignments, and listPartitionReassignments.  As the names imply, the alter API modifies partition reassignments, and the list API lists the ones which are ongoing.

Unlike the current ZooKeeper-based API, alterPartitionAssignments can add or remove partition reassignments without interrupting unrelated assignments that are in progress.  Partition reassignments can be added or removed by the API on a per-partition basis-- there are no global "before" or "after" snapshots.


/**
 * Change the reassignments for one or more partitions.
 *
 * @param reassignments   The reassignments to add, modify, or remove.
 * @param options         The options to use.
 * @return                The result.
 */
public AlterPartitionReassignmentsResult alterPartitionReassignments(
         Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
         AlterPartitionReassignmentsOptions options);

/**
 * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map)}.
 */
public class NewPartitionReassignment {
    private final List<Integer> targetBrokers;
    ...
}

class AlterPartitionAssignmentsResult {
  Map<TopicPartition, Future<Void>> futures; // maps partitions to the results for each partition (success / failure)
  Future<Void> all(); // Throws an exception if any reassignment was rejected
}

class AlterPartitionAssignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

ListPartitionReassignmentsResult listPartitionReassignments(ListPartitionReassignmentsOptions options);

class ListPartitionReassignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

class ListPartitionReassignmentsResult {
  private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments;
}

/**
 * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}.
 */
public class PartitionReassignment {
    /**
     * The brokers which we want this partition to reside on.
     */
    private final List<Integer> targetBrokers;

    /**
     * The brokers which this partition currently resides on.
     */
    private final List<Integer> currentBrokers;
...
}

New RPCs

AlterPartitionAssignmentsRequest

This is used to create or cancel a set of partition reassignments.  It must be sent to the controller.

This operation requires ALTER on CLUSTER.

{
  "apiKey": 45,
  "type": "request",
  "name": "AlterPartitionAssignmentsRequest",
  "validVersions": "0",
  "fields": [
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "The time in ms to wait for the request to complete." },
    { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
      "about": "The topics to reassign.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+",
        "about": "The partitions to reassign.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+",
          "about": "The replicas to place the partitions on, or null to cancel a pending reassignment." }
      ]}
    ]}
  ]
}

AlterPartitionAssignmentsResponse

This is the response from AlterPartitionsRequest.

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to was not the controller.
  • INVALID_REPLICA_ASSIGNMENT: if the specified replica assignment was not valid-- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
  • CLUSTER_AUTHORIZATION_FAILED


{
  "apiKey": 45,
  "type": "response",
  "name": "AlterPartitionAssignmentsResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]ReassignablePartitionResponse", "versions": "0+",
      "about": "The responses to partitions to reassign.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code." },
      { "name": "ErrorString", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error string, or null if there was no error." }
      ]
    }
  ]
}

ListPartitionReassignmentsRequest

This RPC lists the currently active partition reassignments.  It must be sent to the controller.

It requires DESCRIBE on CLUSTER.

{
  "apiKey": 46,
  "type": "request",
  "name": "ListPartitionReassignmentsRequest",
  "validVersions": "0",
  "fields": [
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "The time in ms to wait for the request to complete." }
  ]
}

ListPartitionReassignmentsResponse

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to is not the controller.
  • CLUSTER_AUTHORIZATION_FAILED

If the top-level error code is set, no responses will be provided.


{
  "apiKey": 46,
  "type": "response",
  "name": "ListPartitionReassignmentsResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 on success." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The top-level error message, or null on success." },
    { "name": "Topics", "type": "[]OngoingTopicReassignment", "versions": "0+",
      "about": "The ongoing reassignments for each topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]OngoingPartitionReassignment", "versions": "0+",
          "about": "The ongoing reassignments for each partition.", "fields": [
          { "name": "PartitionId", "type": "int32", "versions": "0+",
            "about": "The partition ID." },
          { "name": "CurrentBrokers", "type": "[]int32", "versions": "0+",
            "about": "The broker IDs which the partition is currently assigned to." },
          { "name": "TargetBrokers", "type": "[]int32", "versions": "0+",
            "about": "The broker IDs which the partition is being reassigned to." }
      ]}
    ]}
  ]
}

Modified RPCs

LeaderAndIsrRequest

We will add a new field to LeaderAndIsrRequest named PendingReplicas.  This field will contain the replicas which will exist once the partition reassignment is complete.

diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index b8988351c..d45727078 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -20,6 +20,8 @@
   // Version 1 adds IsNew.
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
+  //
+  // Version 3 adds PendingReplicas.
   "validVersions": "0-3",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+",
@@ -48,6 +50,8 @@
           "about": "The ZooKeeper version." },
         { "name": "Replicas", "type": "[]int32", "versions": "0+",
           "about": "The replica IDs." },
+        { "name": "PendingReplicas", "type": "[]int32", "versions": "3+", "nullableVersions": "0+",
+          "about": "The replica IDs that we are reassigning the partition to, or null if the partition is not being reassigned." },
         { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, 
           "about": "Whether the replica should have existed on the broker or not." }
       ]}

Implementation

ZooKeeper Changes

/brokers/topics/[topic]/partitions/[partitionId]/state

This znode will now store an array of ints, named targetReplicas.  If this array is not present or null, there is no ongoing reassignment for this topic.  If it is non-null, it corresponds to the target replicas for this partition.  Replicas will be removed from targetReplicas as soon as they join the ISR.

Controller Changes

When the controller starts up, it will load all of the currently active reassignments from the partition state znodes.  This will not impose any additional startup overhead, because the controller needs to read these znodes anyway to start up.

The controller will handle ListPartitionReassignments by listing the current active reassignments.

The controller will handle the AlterPartitionAssignmentsResponse RPC by modifying the specified partition states in /brokers/topics/[topic]/partitions/[partitionId]/state.  Once these nodes are modified, the controller will send out a LeaderAndIsrRequest as appropriate.

Tool Changes

The kafka-reassign-partitions.sh tool will use the new AdminClient APIs to submit the reassignment plan.  It will not use the old ZooKeeper APIs any more.  In order to contact the admin APIs, the tool will accept a --bootstrap-server argument.

When changing the throttle, we will use IncrementalAlterConfigs rather than directly writing to Zookeeper.

Compatibility, Deprecation, and Migration Plan

/admin/reassign_partitions znode

For compatibility purposes, we will continue to allow assignments to be submitted through the /admin/reassign_partitions node.  Just as with the current code, this will only be possible if there are no current assignments.  In other words, the znode has two states: empty and waiting for a write, and non-empty because there are assignments in progress. Once the znode is non-empty, further writes to it will be ignored.

Applications using the old API will not be able to cancel in-progress assignments.  They will also not be able to monitor the status of in-progress assignments, except by polling the znode to see when it becomes empty, which indicates that no assignments are ongoing.  To get these benefits, applications will have to be upgraded to the AdminClient API.

The deprecated ZooKeeper-based API will be removed in a future release.

Rejected Alternatives

Store the pending replicas in a single ZNode rather than in a per-partition ZNode

We could store the pending replicas in a single ZNode per cluster, rather than putting them beside the ISR in /brokers/topics/[topic]/partitions/[partitionId]/state.  However, this ZNode might become very large, which could lead to us running into maximum znode size problems.  This is an existing scalability limitation which we would like to solve.

Another reason for storing the pendingReplicas state in the ISR state ZNode is that this state belongs in the LeaderAndIsrRequest.  By making it clear which replicas are pending and which are existing, we pave the way for future improvements in reassignment.  For example, we will be able to distinguish between reassignment traffic and non-reassignment traffic for quota purposes, and provide better metrics.

Future Work

Reassignment Quotas that only throttle reassignment traffic

Currently, the quotas used for reassignment also apply to all non-ISR traffic.  This is undesirable and can lead to problems if nodes fall out of the ISR.

Since this KIP makes brokers aware of which replicas are being used for reassignment, it will be possible to implement quotas that only hit reassignment traffic, and not all non-ISR traffic.  This will also obviate the need to manually specify the list of reassigning partitions when setting the per-broker quotas.

Add reassignment metrics

We would like to provide metrics for reassignment.  For example, we would like to know how many partitions are being reassigned on a given broker, what write and read bandwidth is being used, etc.  Since this KIP clearly separates replicas that are being reassigned from those that are not, and makes this information available on each broker through the LeaderAndIsrRequest, this will be possible in the future.

Improved API for Reassignment Quotas

We would like a better API for managing reassignment quotas-- perhaps a new AdminClient function and RPC.  This is somewhat of a minor improvement, since we already have a way to change the configuration via admin client.  However, it would still be helpful.

Internal Batching

It would be nice if the controller could internally batch reassignment operations.  The controller has more information with which to make decisions than external systems.  If this were implemented, we would want to enrich the List API with more information, such as which reassignments were currently being worked on, and which the controller had decided to defer until later.

This could be added in a compatible way by making the default batch size infinite, so that users that didn't enable controller-side batching would get the current behavior of all reassignments starting immediately.

  • No labels