DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Voting"
Discussion thread: https://lists.apache.org/thread/9l6m1scv765ohbwjkwq5sotjcdjzjp4q
Vote thread: https://lists.apache.org/thread/mlzvnnfx9jwv0v0lql1hocno4xxk7cr8
JIRA:
KAFKA-18239
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In ClientQuotaCallback#updateClusterMetadata, we pass a Cluster object to this method. The Cluster is an immutable object that holds a lot of information, and a new Cluster is created every time there is a change in metadata. For large clusters with many partitions, this can result in significant memory pressure on Kafka.
Additionally, some information in the Cluster object is unnecessary or confusing. For instance, Cluster#controller refers to a random broker node in a KRaft cluster, which is retained for backward compatibility as it represents the ZK controller. Furthermore, in ZK mode, we parse the listener using the listener name from the request. However, in KRaft mode, there is no listener name in the updateClusterMetadata path. As a result, the callback may receive multiple partition info entries for each listener name. (See KAFKA-19122).
To address these issues, this KIP propose deprecating the current method ClientQuotaCallback#updateClusterMetadata(Cluster) and reimplementing it in a more efficient way, providing clearer and more relevant cluster information.
Public Interfaces
This KIP introduces a new interface ClientQuotaCallbackHandler which basically copy all methods in ClientQuotaCallback except the one we want to deprecate.
boolean updateClusterMetadata(Cluster cluster);
And add a new method to replace the old one, the parameter ClusterMetadata in the new method is a new interface, which will be described below.
boolean updateClusterMetadata(ClusterMetadata clusterMetadata);
Here is the whole content of the new interface:
public interface ClientQuotaCallbackHandler {
/**
* Quota callback invoked to determine the quota metric tags to be applied for a request.
* Quota limits are associated with quota metrics and all clients which use the same
* metric tags share the quota limit.
*
* @param quotaType Type of quota requested
* @param principal The user principal of the connection for which quota is requested
* @param clientId The client id associated with the request
* @return quota metric tags that indicate which other clients share this quota
*/
Map<String, String> quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId);
/**
* Returns the quota limit associated with the provided metric tags. These tags were returned from
* a previous call to {@link #quotaMetricTags(ClientQuotaType, KafkaPrincipal, String)}. This method is
* invoked by quota managers to obtain the current quota limit applied to a metric when the first request
* using these tags is processed. It is also invoked after a quota update or cluster metadata change.
* If the tags are no longer in use after the update, (e.g. this is a {user, client-id} quota metric
* and the quota now in use is a {user} quota), null is returned.
*
* @param quotaType Type of quota requested
* @param metricTags Metric tags for a quota metric of type `quotaType`
* @return the quota limit for the provided metric tags or null if the metric tags are no longer in use
*/
Double quotaLimit(ClientQuotaType quotaType, Map<String, String> metricTags);
/**
* Quota configuration update callback that is invoked when quota configuration for an entity is
* updated in the quorum. This is useful to track configured quotas if built-in quota configuration
* tools are used for quota management.
*
* @param quotaType Type of quota being updated
* @param quotaEntity The quota entity for which quota is being updated
* @param newValue The new quota value
*/
void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue);
/**
* Quota configuration removal callback that is invoked when quota configuration for an entity is
* removed in the quorum. This is useful to track configured quotas if built-in quota configuration
* tools are used for quota management.
*
* @param quotaType Type of quota being updated
* @param quotaEntity The quota entity for which quota is being updated
*/
void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity);
/**
* Returns true if any of the existing quota configs may have been updated since the last call
* to this method for the provided quota type. Quota updates as a result of calls to
* {@link #updateClusterMetadata(ClusterMetadata)}, {@link #updateQuota(ClientQuotaType, ClientQuotaEntity, double)}
* and {@link #removeQuota(ClientQuotaType, ClientQuotaEntity)} are automatically processed.
* So callbacks that rely only on built-in quota configuration tools always return false. Quota callbacks
* with external quota configuration or custom reconfigurable quota configs that affect quota limits must
* return true if existing metric configs may need to be updated. This method is invoked on every request
* and hence is expected to be handled by callbacks as a simple flag that is updated when quotas change.
*
* @param quotaType Type of quota
*/
boolean quotaResetRequired(ClientQuotaType quotaType);
/**
* This callback is invoked whenever there are changes in the cluster metadata, such as
* brokers being added or removed, topics being created or deleted, or partition leadership updates.
* This is useful if quota computation takes partitions into account.
* Topics that are being deleted will not be included.
*
* @param clusterMetadata Cluster metadata including partitions and their leaders if known
* @return true if quotas have changed and metric configs may need to be updated
*/
boolean updateClusterMetadata(ClusterMetadata clusterMetadata);
/**
* Closes this instance.
*/
void close();
}
For backward compatibility, the old interface ClientQuotaCallback will extend the new interface ClientQuotaCallbackHandler, and in the old interface we'll add a new method updateClusterMetadata(ClusterMetadata) have a default implementation and trigger the old one.
The toCluster method is a static method to covert the ClusterMetadata object to Cluster object.
/**
* @deprecated, please use {@link ClientQuotaCallbackHandler} instead
*/
@Deprecated
public interface ClientQuotaCallback extends ClientQuotaCallbackHandler {
/**
* This callback is invoked whenever there are changes in the cluster metadata, such as
* brokers being added or removed, topics being created or deleted, or partition leadership updates.
* This is useful if quota computation takes partitions into account.
* Topics that are being deleted will not be included in `cluster`.
*
* @deprecated please use {@link ClientQuotaCallbackHandler#updateClusterMetadata(ClusterMetadata)} instead
* @param cluster Cluster metadata including partitions and their leaders if known
* @return true if quotas have changed and metric configs may need to be updated
*/
@Deprecated
boolean updateClusterMetadata(Cluster cluster);
default boolean updateClusterMetadata(ClusterMetadata clusterMetadata) {
return updateClusterMetadata(toCluster(clusterMetadata));
}
}
The ClusterMetadata is also a new interface. The goal of ClusterMetadata is:
Eliminate the
Clusterobject creation, allowing us to avoid the overhead of creatingClusterinstances.- Provide key information from the
Clusterobject, such as brokers and partition data, along with helper methods that replicate the functionality of the originalClusterclass while omitting unnecessary information (e.g.,controller(),unauthorizedTopics(),isBootstrapConfigured()).
public interface ClusterMetadata {
/**
* Get a list of Kafka broker node IDs in the cluster.
*/
List<Integer> brokerIds();
/**
* Get specific broker information.
*/
Optional<BrokerMetadata> broker(int nodeId);
/**
* Get specific partition information.
*/
Optional<PartitionMetadata> partition(TopicPartition topicPartition);
/**
* Get partition information of specific topic.
* The key is the partition id, and the value is the metadata
* associated with that partition.
* Return an empty map if topic does not exist.
*/
Map<Integer, PartitionMetadata> partitions(String topic);
/**
* Get partition information of specific topic and specific node.
* The key is the partition id, and the value is the metadata
* associated with that partition.
* Return an empty map if the topic or node id does not exist.
*/
Map<Integer, PartitionMetadata> partitions(String topic, int nodeId);
/**
* Get a map of topics.
* The key is the topic id, and the value is the topic name.
*/
Map<Uuid, String> topics();
/**
* Get ClusterResource that includes cluster id
*/
ClusterResource clusterResource();
/**
* Given a partition metadata, return the subset of the replicas that are offline.
* Return an empty list if there is no offline replicas.
*/
List<Integer> offlineReplicas(PartitionMetadata partitionMetadata);
}
Unlike the Cluster#brokers using the org.apache.kafka.common.Node to present broker information. ClusterMetadata use the new BrokerMetadata interface. The reason behind it is ClientQuotaCallback#updateClusterMetadata does not filter by listener name like the old ZK-based path. Thus, using the existing Node is not appropriate since now we may have multiple listeners in the broker node. Additionally, DynamicTopicClusterQuotaPublisher can access BrokerRegistration, and we can simply add a new interface to BrokerRegistration which already includes the necessary information, making the ClusterMetadata implementation much more efficient.
public interface BrokerMetadata {
/**
* The broker node id
*/
int id();
/**
* The listeners of the broker node.
*/
Map<String, Endpoint> listeners();
/**
* Whether if this node is fenced
*/
boolean fenced();
/**
* The rack for this node
*/
Optional<String> rack();
}
PartitionMetadata is similar to BrokerMetadata. We can leverage the existing PartitionRegistration object to create a new interface that provides partition metadata efficiently, avoiding the need to convert PartitionRegistration into PartitionInfo.
The main difference is isAvailable(), as PartitionRegistration does not include this information. However, since Cluster#availablePartitionsForTopic allows users to query available partitions, providing a default method here for convenience.
public interface PartitionMetadata {
/**
* The node id of the node currently acting as a leader for this partition or -1 if there is no leader
*/
int leader();
/**
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/
List<Integer> replicas();
/**
* The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
* the leader should fail. Note that there is no guarantee the first element is leader, please use leader()
* to get the leader of partition.
*/
List<Integer> inSyncReplicas();
/**
* Check if the partition is available
*/
default boolean isAvailable() {
return leader() != Node.noNode().id();
}
}
Proposed Changes
In DynamicTopicClusterQuotaPublisher#onMetadataUpdate, invoke the new method introduced in this KIP. And since now we are using the new interface, all ClientQuotaCallback invocation in the current code base (except the tests) should be replaced with ClientQuotaCallbackHandler.
@@ -52,8 +51,8 @@ class DynamicTopicClusterQuotaPublisher (
try {
quotaManagers.clientQuotaCallbackPlugin().ifPresent(plugin => {
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
- val cluster = MetadataCache.toCluster(clusterId, newImage)
- if (plugin.get().updateClusterMetadata(cluster)) {
+ val clusterMetadata = new DefaultClusterMetadata(clusterId, newImage)
+ if (plugin.get().updateClusterMetadata(clusterMetadata)) {
quotaManagers.fetch.updateQuotaMetricConfigs()
quotaManagers.produce.updateQuotaMetricConfigs()
quotaManagers.request.updateQuotaMetricConfigs()
Make PartitionRegistration implement PartitionMetadata and BrokerRegistration implement BrokerMetadata. In DefaultClusterMetadata (the implementation of ClusterMetadata), we can use BrokerRegistration and PartitionRegistration to return the necessary information without duplicating data. This avoids the overhead of creating new objects.
For example, the implementation of ClusterMetadata#partitions(String) simply wraps the map returned by TopicImage.partitions().
metadataImage.topics().getTopic(topic).partitions().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
The implementation for ClusterMetadata#partition(TopicPartition topicPartition) is similarly efficient.
metadataImage.topics().getTopic(topicPartition.topic()).partitions().get(topicPartition.partition());
Other methods in ClusterMetadata will follow a similar pattern.
Compatibility, Deprecation, and Migration Plan
This KIP should be backward-compatible, as the deprecated method updateClusterMetadata(Cluster) is still supported and invoked in the new method.
However, users are expected to implement the new interface ClientQuotaCallbackHandler. The old one - ClientQuotaCallback will be removed in a future major release.
Test Plan
We will rely on the existing CustomQuotaCallbackTest.java to ensure the deprecated method still works.
Additionally, we will add some new tests in this test class to ensure the new method work as expected.
Rejected Alternatives
Add the new method
updateClusterMetadata(ClusterMetadata)to the old interfaceClientQuotaCallback, while doing this have a downside is that once we remove that deprecated function, it will break compilation for anyone who already implementing it.public interface ClientQuotaCallback extends Configurable { // ...skip ...// @Deprecated boolean updateClusterMetadata(Cluster cluster); default boolean updateClusterMetadata(ClusterMetadata clusterMetadata) { return updateClusterMetadata(toCluster(clusterMetadata)); } }