...
Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.
Table of Contents |
---|
Status
Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
...
Secondly, ReassignPartitionsCommand
currently has no proper facility to report progress of a reassignment; --verify
can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
...
Two new network protocol APIs will be added:
The AdminClient
API will have two new methods added (plus overloads for options):
The options accepted by kafka-reassign-partitions.sh
command will change:
--zookeeper
will be deprecated, with a warning message- a new
--bootstrap-server
option will be added - a new
--progress
action option will be added
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
kafka-reassign-partitions.sh
and ReassignPartitionsCommand
The --zookeeper
option will be retained and will:
...
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient: alterTopics()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support the ability to reassign partitions:...
Code Block |
---|
public class AlteredTopic { public AlteredTopic(String name, int numPartitions, int replicationFactor, Map<Integer,List<Integer>> replicasAssignment) { // ... } /** The name of the topic to alter. */ public String name(); /** The new number of partitions, or -1 if the number of partitions should not be changed. */ public int numPartitions(); /** The new replication factor, or -1 if the replication factor should not be changed. */ public short replicationFactor(); /** * The new assignments of partition to brokers, or the empty map * if the broker should assign replicas automatically. */ Map<Integer,List<Integer>> replicasAssignment(); } public class AlterTopicsOptions { public AlterTopicsOptions validateOnly(boolean validateOnly); public boolean validateOnly(); public AlterTopicsOptions timeoutMs(long timeoutMs); public long timeoutMs(); } public class AlterTopicsResult { // package-access constructor /** A mapping of the name of a requested topic to the error for that topic. */ Map<String, KafkaFuture<Void>> values(); /** Return a future which succeeds if all the topic alterations were accepted. */ KafkaFuture<Void> all(); } |
AdminClient: replicaStatus()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support the progress reporting functionality:...
Code Block |
---|
public class ReplicaStatusOptions { } public class ReplicaStatusResult { public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all() } /** * Represents the replication status of a partition * on a particular broker. */ public class ReplicaStatus { /** The topic about which this is the status of */ String topic() /** The partition about which this is the status of */ int partition() /** The broker about which this is the status of */ int broker() /** * The time (as milliseconds since the epoch) that * this status data was collected. In general this may * be some time before the replicaStatus() request time. */ public long statusTime() /** * The number of messages that the replica on this broker is behind * the leader. */ public long lag() } |
Authorization
With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".
The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics()
will require ClusterAction
on the CLUSTER
.
replicaStatus()
will require Describe
on the CLUSTER
.
Network Protocol: AlterTopicsRequest
and AlterTopicsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterTopicsRequest
will initiate the process of topic alteration/partition reassignment...
As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS
error code.
Policy
The existing CreateTopicPolicy
can be used to apply a cluster-wide policy on topic configuration at the point of creation via the create.topic.policy.class.name
config property. To avoid an obvious loophole, it is necessary to also be able to apply a policy to topic alteration. Maintaining two separate policies in sync is a burden both in terms of class implementation and configuring the policy. It seems unlikely that many use cases would require a different policy for alteration than creation. On the other hand, just applying the CreateTopicPolicy
to alterations is undesirable because:
...
The existing create.topic.policy.class.name
config would continue to work, and would continue to name an implementation of CreateTopicPolicy
. That policy would be applied to alterations automatically. The topic's config would be presented to the validate()
method (via the RequestMetadata
) even though it's not actually part of the AlterTopicsRequest
. The documentation for the interface and config property would be updated.
Network Protocol: ReplicaStatusRequest
and ReplicaStatusResponse
Anchor | ||||
---|---|---|---|---|
|
ReplicaStatusRequest
requests information about the progress of a number of replicas....
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed. (or the TOPIC?)INVALID_TOPIC_EXCEPTION
(17) The topic is not knownINVALID_PARTITIONS
(37) Thepartion
_id
of the given topic is not validUNKNOWN_MEMBER_ID
(25) The givenbroker
id is not known.UNKNOWN_TOPIC_OR_PARTITION
(3) The givenbroker
is not a follower for the partition identified bytopic
,partition
.NONE
(0) if the status request completed normally,
Implementation
The AdminClient.replicaStatus()
will make the underlying ReplicaStatusRequest
to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server
) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Existing users of the kafka-reassign-partitions.sh
will receive a deprecation warning when they use the --zookeeper
option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
...