...
Code Block | ||||
---|---|---|---|---|
| ||||
val disableReplicationFactorChangedisallowReplicationFactorChange = parser.accepts("disabledisallow-replication-factor-change", "DisablesDenies the ability to change a partition's replication factor as part of this reassignment through adding validation against it.") |
...
Code Block |
---|
/bin/kafka-reassign-partitions.sh --reassignment-json-file ./reassign.json --execute --bootstrap-server localhost:9092 --disabledisallow-replication-factor-change |
...
A new option will be added to the request object and its associated request/response version bumped to 1.
AlterPartitionReassignmentsRequest
Code Block | |||||
---|---|---|---|---|---|
| |||||
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request.
{
"apiKey": 45,
"type": "request",
"listeners": ["broker", "controller", "zkBroker"],
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0-1", // <-------NEW - version 1
"flexibleVersions": "0+",
"fields": [
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "The time in ms to wait for the request to complete." },
{ "name": "AllowReplicationFactorChange", "type": "boolean", "versions": "1+", "default": "true", // <--------- NEW
"about": "The option indicating whether changing the replication factor of any given partition as part of this request is a valid move." }, // <--------- NEW
{ "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
"about": "The topics to reassign.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+",
"about": "The partitions to reassign.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "brokerId",
"about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." }
]}
]}
]
} |
AlterPartitionReassignmentsResponse
Code Block | ||
---|---|---|
| ||
// Version 1 adds the ability to allow/disallow changing the replication factor as part of the request. { "apiKey": 45, "type": "response", "name": "AlterPartitionReassignmentsResponse", "validVersions": "0-1", // <-------NEW - version 1 "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "AllowReplicationFactorChange", "type": "boolean", "versions": "1+", "default": "true", "ignorable": true, // <--------- NEW "about": "The option indicating whether changing the replication factor of any given partition as part of the request was allowed." }, // <--------- NEW { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The top-level error message, or null if there was no error." }, { "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+", "about": "The responses to topics to reassign.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name" }, { "name": "Partitions", "type": "[]ReassignablePartitionResponse", "versions": "0+", "about": "The responses to partitions to reassign", "fields": [ diff --git a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json index a50fcb92b5..a37904c092 100644 --- a/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json +++ b/clients/src/main/resources/common/message/AlterPartitionReassignmentsRequest.json @@ -18,11 +18,13 @@ "type": "request", "listeners": ["broker", "controller", "zkBroker"], "name": "AlterPartitionReassignmentsRequest", - "validVersions": "0", + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "TimeoutMsPartitionIndex", "type": "int32", "versions": "0+", "default": "60000+", "about": "The time in ms to wait for the request to completepartition index." }, + { "name": "AllowReplicationFactorChangeErrorCode", "type": "booleanint16", "versions": "10+", "default": "true", + "about": "The option indicating whether changing the replication factor of any given partition as part of this request is a valid moveerror code for this partition, or 0 if there was no error." }, { "name": "TopicsErrorMessage", "type": "[]ReassignableTopicstring", "versions": "0+", "nullableVersions": "0+", "about": "The topics to reassign.", "fields": [": "The error message for this partition, or null if there was no error." } ]} { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", ]} ] } |
Validation
If AllowReplicationFactorChange is false, and the request does change the replication factor of a partition, a new ApiError(Errors.INVALID_REPLICATION_FACTOR)
will be returned for the associated partitions. Consistent with the current behavior of the API, valid (unaffected) partition reassignments will succeed as part of the same request - and only the invalid ones will have an error returned as part of the response.
...
If a new client version that supports this option tries to set it on a Kafka broker with an old version that doesn't support this option, we want to throw an exception.
To enforce this, we will add a client-side validation of a minimum request version of (1)
in the request builder in the admin client. Ths This results in the client throwing an UnsupportedVersionException("The broker does not support ALTER_PARTITION_REASSIGNMENTS with version in range [1, 1]. The supported range is [0, 0].")
.
...