DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/c3q584qtrodys2mkzdg0qxrktzbcjzmp
Vote thread: https://lists.apache.org/thread/bmr5l38gfnqrws3qfpxqfp7y9r7lzdrc, https://lists.apache.org/thread/nnnfxftg5954xdvkwg3okhrlxw32wk1l
JIRA:
KAFKA-18904
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In the current design of kafka-configs.sh, users can modify configurations for non-existent consumer groups and retrieve their dynamic configurations when describing a specific non-existent group. However, there is a significant limitation: users cannot view the dynamic configurations of all groups. This inconsistency creates challenges for administrators trying to manage dynamic configurations effectively. Without visibility into which non-existent groups have dynamic configurations, administrators are forced to rely on memory or external tracking, increasing the risk of oversight and inefficiency.
For example, consider the non-existent group "G1." Administrators can successfully add a dynamic configuration to this group and retrieve it by specifying the group name. However, when describing all groups, the dynamic configuration for "G1" does not appear, leaving a gap in the visibility of such configurations. This scenario makes it cumbersome to maintain and manage configurations, especially in large-scale environments with numerous consumer groups.
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --group G1 --add-config consumer.heartbeat.interval.ms=10000
Completed updating config for group G1.
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name G1
Dynamic configs for group G1 are:
consumer.heartbeat.interval.ms=10000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.heartbeat.interval.ms=10000}
To address this issue, this KIP proposes using ListConfigResources API to retrieve all resources with dynamic config. By implementing this enhancement, administrators will gain comprehensive visibility into all groups.
Public Interfaces
ListConfigResourcesRequest.json
Change ListClientMetricsResourcesRequest to ListConfigResourcesRequest. Add a new field ResourceTypes, so user can use it to filter resources.
{
"apiKey": 74,
"type": "request",
"listeners": ["broker"],
"name": "ListConfigResourcesRequest",
// Version 0 is used as ListClientMetricsResourcesRequest which only lists client metrics resources.
// Version 1 adds ResourceTypes field (KIP-1142). If there is no specified ResourceTypes, it should return all configuration resources.
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ResourceTypes", "type": "[]int8", "versions": "1+",
"about": "The list of resource type." }
]
}
ListConfigResourcesResponse.json
Change ListClientMetricsResourcesResponse to ListConfigResourcesResponse. Change field name ClientMetricsResources to ConfigResources and Name to ResourceName. Add a new field ResourceType to ConfigResources.
{
"apiKey": 74,
"type": "response",
"name": "ListConfigResourcesResponse",
// Version 0 is used as ListClientMetricsResourcesResponse which returns all client metrics resources.
// Version 1 adds ResourceType to ConfigResources (KIP-1142).
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ConfigResources", "type": "[]ConfigResource", "versions": "0+",
"about": "Each config resource in the response.", "fields": [
{ "name": "ResourceName", "type": "string", "versions": "0+",
"about": "The resource name." },
{ "name": "ResourceType", "type": "int8", "versions": "1+",
"about": "The resource type." }
]}
]
}
ListClientMetricsResourcesResult.java and ClientMetricsResourceListing.java
Deprecated these functions since 4.1 and remove it in 5.0.
ApiKeys.java
Change LIST_CLIENT_METRICS_RESOURCES to LIST_CONFIG_RESOURCES.
public enum ApiKeys {
// ...
LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES), <-- this is removed
LIST_CONFIG_RESOURCES(ApiMessageType.LIST_CONFIG_RESOURCES), <-- this is added
// ...
}
Admin
Add new function listConfigResources and deprecate listClientMetricsResources.
public interface Admin extends AutoCloseable {
/**
* List the configuration resources available in the cluster which matches config resource type.
* If no config resource types are specified, all configuration resources will be listed.
*
* @param options The options to use when listing the configuration resources.
* @return The ListConfigurationResourcesResult.
*/
ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options);
/**
* List all configuration resources available in the cluster with the default options.
* <p>
* This is a convenience method for {@link #listConfigResources(Set, ListConfigResourcesOptions)}
* with default options. See the overload for more details.
*
* @return The ListConfigurationResourcesResult.
*/
default ListConfigResourcesResult listConfigResources() {
return listConfigResources(Set.of(), new ListConfigResourcesOptions());
}
/**
* List the client metrics configuration resources available in the cluster.
*
* @param options The options to use when listing the client metrics resources.
* @return The ListClientMetricsResourcesResult.
* @deprecated Since 4.1. Use {@link #listConfigResources(Set, ListConfigResourcesOptions)} instead.
*/
@Deprecated
ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options);
/**
* List the client metrics configuration resources available in the cluster with the default options.
* <p>
* This is a convenience method for {@link #listClientMetricsResources(ListClientMetricsResourcesOptions)}
* with default options. See the overload for more details.
*
* @return The ListClientMetricsResourcesResult.
* @deprecated Since 4.1. Use {@link #listConfigResources()} instead.
*/
@Deprecated
default ListClientMetricsResourcesResult listClientMetricsResources() {
return listClientMetricsResources(new ListClientMetricsRe
sourcesOptions());
}
}
ListConfigResourcesOptions.java / ListClientMetricsResourcesOptions.java
Add new ListConfigResourcesOptions and deprecate ListClientMetricsResourcesOptions.
/**
* Options for {@link Admin#listClientMetricsResources()}.
* @deprecated Since 4.1. Use {@link ListConfigResourcesOptions} instead.
*/
@Deprecated
public class ListClientMetricsResourcesOptions extends AbstractOptions<ListClientMetricsResourcesOptions> {
}
/**
* Options for {@link Admin#listConfigResources()}.
*/
public class ListConfigResourcesOptions extends AbstractOptions<ListConfigResourcesOptions> {
}
ListConfigResourcesResult.java
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Collection;
/**
* The result of the {@link Admin#listConfigResources()} call.
* <p>
*/
public class ListConfigResourcesResult {
private final KafkaFuture<Collection<ConfigResource>> future;
ListConfigResourcesResult(KafkaFuture<Collection<ConfigResource>> future) {
this.future = future;
}
/**
* Returns a future that yields either an exception, or the full set of config resources.
*
* In the event of a failure, the future yields nothing but the first exception which
* occurred.
*/
public KafkaFuture<Collection<ConfigResource>> all() {
final KafkaFutureImpl<Collection<ConfigResource>> result = new KafkaFutureImpl<>();
future.whenComplete((resources, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(resources);
}
});
return result;
}
}
ListConfigResourcesRequest.java
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ListConfigResourcesRequestData;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
public class ListConfigResourcesRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ListConfigResourcesRequest> {
public final ListConfigResourcesRequestData data;
public Builder(ListConfigResourcesRequestData data) {
super(ApiKeys.LIST_CONFIG_RESOURCES);
this.data = data;
}
@Override
public ListConfigResourcesRequest build(short version) {
return new ListConfigResourcesRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ListConfigResourcesRequestData data;
private ListConfigResourcesRequest(ListConfigResourcesRequestData data, short version) {
super(ApiKeys.
LIST_CONFIG_RESOURCES, version);
this.data = data;
}
public ListConfigResourcesRequestData data() {
return data;
}
@Override
public ListConfigResourcesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
ListConfigResourcesResponseData response = new ListConfigResourcesResponseData()
.setErrorCode(error.code())
.setThrottleTimeMs(throttleTimeMs);
return new ListConfigResourcesResponse(response);
}
public static ListConfigResourcesRequest parse(Readable readable, short version) {
return new ListConfigResourcesRequest(new ListConfigResourcesRequestData(
readable, version), version);
}
@Override
public String toString(boolean verbose) {
return data.toString();
}
}
ListConfigResourcesResponse.java
package org.apache.kafka.common.requests;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
public class ListConfigResourcesResponse extends AbstractResponse {
private final ListConfigResourcesResponseData data;
public ListConfigResourcesResponse(ListConfigResourcesResponseData data) {
super(ApiKeys.LIST_CONFIG_RESOURCES);
this.data = data;
}
public ListConfigResourcesResponseData data() {
return data;
}
public ApiError error() {
return new ApiError(Errors.forCode(data.errorCode()));
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}
public static ListConfigResourcesResponse parse(ByteBuffer buffer, short version) {
return new ListConfigResourcesResponse(new ListConfigResourcesResponseData(
new ByteBufferAccessor(buffer), version));
}
@Override
public String toString() {
return data.toString();
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public Collection<ConfigResource> configResources() {
return data.configResources()
.stream()
.map(entry -> new ConfigResource(ConfigResource.Type.forId(entry.resourceType()), entry.resourceName()))
.collect(Collectors.toList());
}
}
Proposed Changes
kafka-configs.sh
—describe —entity-type groups
In before, this command didn’t output non-existent group with dynamic config. In this KIP, this command also outputs this kind of group. Assume “G1“ doesn’t exist, but has dynamic config. The command will output it like:
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups
Dynamic configs for group G1 are:
consumer.heartbeat.interval.ms=10000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.heartbeat.interval.ms=10000}
—describe —entity-type groups —entity-name <non-existent-group-without-dynamic-config>
In before, the command outputs empty body for non-existent group without dynamic config like:
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name DOESNOTEXIST Dynamic configs for group DOESNOTEXIST are:
In this KIP, the command let users know the group doesn’t exist and doesn’t have dynamic config.
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name DOESNOTEXIST The group DOESNOTEXIST doesn't exist and doesn't have dynamic config.
—describe —entity-type client-metrics —entity-name <non-existent-client-metrics>
In before, the command outputs empty body for non-existent client-metrics like:
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type client-metrics --entity-name DOESNOTEXIST Dynamic configs for client-metric DOESNOTEXIST are:
In this KIP, the command let users know the client-metrics doesn’t exist.
> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type client-metrics --entity-name DOESNOTEXIST The client-metric DOESNOTEXIST doesn't exist and doesn't have dynamic config.
kafka-client-metrics.sh
—describe —name <non-existent-client-metrics>
In before, the command outputs default config for a non-existent client-metric like:
> ./bin/kafka-client-metrics.sh --bootstrap-server localhost:9092 --describe --name DOESNOTEXIST Client metrics configs for DOESNOTEXIST are: interval.ms=300000 match= metrics=
In this KIP, the command let users know the client-metrics doesn't exist.
> ./bin/kafka-client-metrics.sh --bootstrap-server localhost:9092 --describe --name DOESNOTEXIST The client-metric DOESNOTEXIST doesn't exist and doesn't have dynamic config.
Compatibility, Deprecation, and Migration Plan
RPC
Bumps the version of ListConfigResourcesRequest to avoid any version incompatibility issues with older broker.
The v0 ListConfigResourcesRequest is handled as ListClientMetricsResourcesRequest. It lists all client metrics resources.
The v1 ListConfigResourcesRequest returns all types of configuration resources by default. If there is specified resource types, only returning matched configuration resources.
Admin
The listClientMetricsResources sends v0 ListConfigResourcesRequest. The listConfigResources sends v1 ListConfigResourcesRequest. If user sends v1 ListConfigResourcesRequest to an old broker which only supports v0 request. The request falls back to v0 if there is only client metrics resource type in request body. If resource type is empty or there is other resource types, it returns UNSUPPORTED_VERSION to the user.
Metrics
Before 4.1, the metric records request → ListClientMetricsResources. After this KIP, there is no ListClientMetricsResources. To be compatible with old versions, the metric should record both ListClientMetricsResources and ListConfigResources when the version is 0. For version above 1, it just record ListConfigResources.
Test Plan
ListConfigResourcesRequest integration tests
- Validate v0 request can get expected result like ListClientMetricsResourcesRequest.
Validate v1 request can get expected result with different resource types.
kafka-configs.sh integration tests
If there is a non-existent group with dynamic config, the command can show it without specified the entity name.
Describing a non-existent group without dynamic config shows the group doesn’t exist.
Describing a non-existent client-metric shows the client-metric doesn’t exist.
kafka-client-metrics.sh integration tests
- Describing a non-existent client-metric shows the client-metric doesn’t exist.
Rejected Alternatives
N/A