Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: specify RPCs explicitly and rename CLI flag to "disallow"

...

Code Block
languagescala
titleReassignPartitionsCommand.scala
    val disableReplicationFactorChangedisallowReplicationFactorChange = parser.accepts("disabledisallow-replication-factor-change", "DisablesDenies the ability to change a partition's replication factor as part of this reassignment through adding validation against it.")

...

Code Block
/bin/kafka-reassign-partitions.sh --reassignment-json-file ./reassign.json --execute --bootstrap-server localhost:9092 --disabledisallow-replication-factor-change

...

A new option will be added to the request object and its associated request/response version bumped to 1.

AlterPartitionReassignmentsRequest

Code Block
languagediff
titleAlterPartitionReassignmentsRequest.json
js
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request.
{
  "apiKey": 45,
  "type": "request",
  "listeners": ["broker", "controller", "zkBroker"],
  "name": "AlterPartitionReassignmentsRequest",
  "validVersions": "0-1", // <-------NEW - version 1
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "The time in ms to wait for the request to complete." },
    { "name": "AllowReplicationFactorChange", "type": "boolean", "versions": "1+", "default": "true", // <--------- NEW
      "about": "The option indicating whether changing the replication factor of any given partition as part of this request is a valid move." }, // <--------- NEW
    { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
      "about": "The topics to reassign.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+",
        "about": "The partitions to reassign.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "brokerId",
          "about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." }
      ]}
    ]}
  ]
}

AlterPartitionReassignmentsResponse

Code Block
languagejs
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request.
{
  "apiKey": 45,
  "type": "response",
  "name": "AlterPartitionReassignmentsResponse",
  "validVersions": "0-1", // <-------NEW - version 1 
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "AllowReplicationFactorChange", "type": "boolean", "versions": "1+", "default": "true", "ignorable": true, // <--------- NEW
      "about": "The option indicating whether changing the replication factor of any given partition as part of the request was allowed." }, // <--------- NEW 
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+",
      "about": "The responses to topics to reassign.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name" },
      { "name": "Partitions", "type": "[]ReassignablePartitionResponse", "versions": "0+",
        "about": "The responses to partitions to reassign", "fields": [
   diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
index a50fcb92b5..a37904c092 100644
--- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json
@@ -18,11 +18,13 @@
   "type": "request",
   "listeners": ["broker", "controller", "zkBroker"],
   "name": "AlterPartitionReassignmentsRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "TimeoutMsPartitionIndex", "type": "int32", "versions": "0+", "default": "60000+",
          "about": "The time in ms to wait for the request to completepartition index." },
+        { "name": "AllowReplicationFactorChangeErrorCode", "type": "booleanint16", "versions": "10+", "default": "true",
+
          "about": "The option indicating whether changing the replication factor of any given partition as part of this request is a valid moveerror code for this partition, or 0 if there was no error." },
        { "name": "TopicsErrorMessage", "type": "[]ReassignableTopicstring", "versions": "0+", "nullableVersions": "0+",
          "about": "The topics to reassign.", "fields": [": "The error message for this partition, or null if there was no error." }
      ]}
 { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",

 ]}
  ]
}


Validation

If AllowReplicationFactorChange is false, and the request does change the replication factor of a partition, a new ApiError(Errors.INVALID_REPLICATION_FACTOR)  will be returned for the associated partitions. Consistent with the current behavior of the API, valid (unaffected) partition reassignments will succeed as part of the same request - and only the invalid ones will have an error returned as part of the response.

...

If a new client version that supports this option tries to set it on a Kafka broker with an old version that doesn't support this option, we want to throw an exception.
To enforce this, we will add a client-side validation of a minimum request version of (1)  in the request builder in the admin client. Ths This results in the client throwing an UnsupportedVersionException("The broker does not support ALTER_PARTITION_REASSIGNMENTS with version in range [1, 1]. The supported range is [0, 0].").

...