Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/c3q584qtrodys2mkzdg0qxrktzbcjzmp

Vote thread: https://lists.apache.org/thread/bmr5l38gfnqrws3qfpxqfp7y9r7lzdrc, https://lists.apache.org/thread/nnnfxftg5954xdvkwg3okhrlxw32wk1l

JIRA: KAFKA-18904 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the current design of kafka-configs.sh, users can modify configurations for non-existent consumer groups and retrieve their dynamic configurations when describing a specific non-existent group. However, there is a significant limitation: users cannot view the dynamic configurations of all groups. This inconsistency creates challenges for administrators trying to manage dynamic configurations effectively. Without visibility into which non-existent groups have dynamic configurations, administrators are forced to rely on memory or external tracking, increasing the risk of oversight and inefficiency.

For example, consider the non-existent group "G1." Administrators can successfully add a dynamic configuration to this group and retrieve it by specifying the group name. However, when describing all groups, the dynamic configuration for "G1" does not appear, leaving a gap in the visibility of such configurations. This scenario makes it cumbersome to maintain and manage configurations, especially in large-scale environments with numerous consumer groups.

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --group G1 --add-config consumer.heartbeat.interval.ms=10000
Completed updating config for group G1.

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name G1
Dynamic configs for group G1 are:
  consumer.heartbeat.interval.ms=10000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.heartbeat.interval.ms=10000}

To address this issue, this KIP proposes using ListConfigResources API to retrieve all resources with dynamic config. By implementing this enhancement, administrators will gain comprehensive visibility into all groups.

Public Interfaces

ListConfigResourcesRequest.json

Change ListClientMetricsResourcesRequest to ListConfigResourcesRequest. Add a new field ResourceTypes, so user can use it to filter resources.

{
  "apiKey": 74,
  "type": "request",
  "listeners": ["broker"],
  "name": "ListConfigResourcesRequest",
  // Version 0 is used as ListClientMetricsResourcesRequest which only lists client metrics resources.
  // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified ResourceTypes, it should return all configuration resources.
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ResourceTypes", "type": "[]int8", "versions": "1+",
      "about": "The list of resource type." }
  ]
}

ListConfigResourcesResponse.json

Change ListClientMetricsResourcesResponse to ListConfigResourcesResponse. Change field name ClientMetricsResources to ConfigResources and Name to ResourceName. Add a new field ResourceType to ConfigResources.

{
  "apiKey": 74,
  "type": "response",
  "name": "ListConfigResourcesResponse",
  // Version 0 is used as ListClientMetricsResourcesResponse which returns all client metrics resources.
  // Version 1 adds ResourceType to ConfigResources (KIP-1142).
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
       { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or 0 if there was no error." },
      { "name": "ConfigResources", "type": "[]ConfigResource", "versions": "0+",
        "about": "Each config resource in the response.", "fields": [
        { "name": "ResourceName", "type": "string", "versions": "0+",
          "about": "The resource name." },
        { "name": "ResourceType", "type": "int8", "versions": "1+",
          "about": "The resource type." }
    ]}
  ]
}

ListClientMetricsResourcesResult.java and ClientMetricsResourceListing.java

Deprecated these functions since 4.1 and remove it in 5.0.

ApiKeys.java

Change LIST_CLIENT_METRICS_RESOURCES to LIST_CONFIG_RESOURCES.

public enum ApiKeys {
    // ...
	LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES), <-- this is removed
	LIST_CONFIG_RESOURCES(ApiMessageType.LIST_CONFIG_RESOURCES), <-- this is added
	// ...
}

Admin

Add new function listConfigResources and deprecate listClientMetricsResources.

public interface Admin extends AutoCloseable {
    /**
     * List the configuration resources available in the cluster which matches config resource type.
     * If no config resource types are specified, all configuration resources will be listed.
     *
     * @param options The options to use when listing the configuration resources.
     * @return The ListConfigurationResourcesResult.
     */
    ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options);

    /**
     * List all configuration resources available in the cluster with the default options.
     * <p>
     * This is a convenience method for {@link #listConfigResources(Set, ListConfigResourcesOptions)}
     * with default options. See the overload for more details.
     *
     * @return The ListConfigurationResourcesResult.
     */
    default ListConfigResourcesResult listConfigResources() {
        return listConfigResources(Set.of(), new ListConfigResourcesOptions());
    }

    /**
     * List the client metrics configuration resources available in the cluster.
     *
     * @param options The options to use when listing the client metrics resources.
     * @return The ListClientMetricsResourcesResult.
     * @deprecated Since 4.1. Use {@link #listConfigResources(Set, ListConfigResourcesOptions)} instead.
     */
    @Deprecated
    ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options);

    /**
     * List the client metrics configuration resources available in the cluster with the default options.
     * <p>
     * This is a convenience method for {@link #listClientMetricsResources(ListClientMetricsResourcesOptions)}
     * with default options. See the overload for more details.
     *
     * @return The ListClientMetricsResourcesResult.
     * @deprecated Since 4.1. Use {@link #listConfigResources()} instead.
     */
    @Deprecated
    default ListClientMetricsResourcesResult listClientMetricsResources() {
        return listClientMetricsResources(new ListClientMetricsRe
sourcesOptions());
    }
}

ListConfigResourcesOptions.java / ListClientMetricsResourcesOptions.java

Add new ListConfigResourcesOptions and deprecate ListClientMetricsResourcesOptions.

/**
 * Options for {@link Admin#listClientMetricsResources()}.
 * @deprecated Since 4.1. Use {@link ListConfigResourcesOptions} instead.
 */
@Deprecated
public class ListClientMetricsResourcesOptions extends AbstractOptions<ListClientMetricsResourcesOptions> {
}
/**
 * Options for {@link Admin#listConfigResources()}.
 */
public class ListConfigResourcesOptions extends AbstractOptions<ListConfigResourcesOptions> {
}

ListConfigResourcesResult.java

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.KafkaFutureImpl;

import java.util.Collection;

/**
 * The result of the {@link Admin#listConfigResources()} call.
 * <p>
 */
public class ListConfigResourcesResult {
    private final KafkaFuture<Collection<ConfigResource>> future;

    ListConfigResourcesResult(KafkaFuture<Collection<ConfigResource>> future) {
        this.future = future;
    }

    /**
     * Returns a future that yields either an exception, or the full set of config resources.
     *
     * In the event of a failure, the future yields nothing but the first exception which
     * occurred.
     */
    public KafkaFuture<Collection<ConfigResource>> all() {
        final KafkaFutureImpl<Collection<ConfigResource>> result = new KafkaFutureImpl<>();
        future.whenComplete((resources, throwable) -> {
            if (throwable != null) {
                result.completeExceptionally(throwable);
            } else {
                result.complete(resources);
            }
        });
        return result;
    }
}

ListConfigResourcesRequest.java

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.ListConfigResourcesRequestData;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;

public class ListConfigResourcesRequest extends AbstractRequest {
    public static class Builder extends AbstractRequest.Builder<ListConfigResourcesRequest> {
        public final ListConfigResourcesRequestData data;

        public Builder(ListConfigResourcesRequestData data) {
            super(ApiKeys.LIST_CONFIG_RESOURCES);
            this.data = data;
        }

        @Override
        public ListConfigResourcesRequest build(short version) {
            return new ListConfigResourcesRequest(data, version);
        }

        @Override
        public String toString() {
            return data.toString();
        }
    }

    private final ListConfigResourcesRequestData data;

    private ListConfigResourcesRequest(ListConfigResourcesRequestData data, short version) {
        super(ApiKeys.
LIST_CONFIG_RESOURCES, version);
        this.data = data;
    }

    public ListConfigResourcesRequestData data() {
        return data;
    }

    @Override
    public ListConfigResourcesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
        Errors error = Errors.forException(e);
        ListConfigResourcesResponseData response = new ListConfigResourcesResponseData()
            .setErrorCode(error.code())
            .setThrottleTimeMs(throttleTimeMs);
        return new ListConfigResourcesResponse(response);
    }

    public static ListConfigResourcesRequest parse(Readable readable, short version) {
        return new ListConfigResourcesRequest(new ListConfigResourcesRequestData(
            readable, version), version);
    }

    @Override
    public String toString(boolean verbose) {
        return data.toString();
    }

}

ListConfigResourcesResponse.java

package org.apache.kafka.common.requests;

import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

public class ListConfigResourcesResponse extends AbstractResponse {
    private final ListConfigResourcesResponseData data;

    public ListConfigResourcesResponse(ListConfigResourcesResponseData data) {
        super(ApiKeys.LIST_CONFIG_RESOURCES);
        this.data = data;
    }

    public ListConfigResourcesResponseData data() {
        return data;
    }

    public ApiError error() {
        return new ApiError(Errors.forCode(data.errorCode()));
    }

    @Override
    public Map<Errors, Integer> errorCounts() {
        return errorCounts(Errors.forCode(data.errorCode()));
    }

    public static ListConfigResourcesResponse parse(ByteBuffer buffer, short version) {
        return new ListConfigResourcesResponse(new ListConfigResourcesResponseData(
            new ByteBufferAccessor(buffer), version));
    }

    @Override
    public String toString() {
        return data.toString();
    }

    @Override
    public int throttleTimeMs() {
        return data.throttleTimeMs();
    }

    @Override
    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
        data.setThrottleTimeMs(throttleTimeMs);
    }

    public Collection<ConfigResource> configResources() {
        return data.configResources()
            .stream()
            .map(entry -> new ConfigResource(ConfigResource.Type.forId(entry.resourceType()), entry.resourceName()))
            .collect(Collectors.toList());
    }
}

Proposed Changes

kafka-configs.sh

—describe —entity-type groups

In before, this command didn’t output non-existent group with dynamic config. In this KIP, this command also outputs this kind of group. Assume “G1“ doesn’t exist, but has dynamic config. The command will output it like:

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups
Dynamic configs for group G1 are:
  consumer.heartbeat.interval.ms=10000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.heartbeat.interval.ms=10000}

—describe —entity-type groups —entity-name <non-existent-group-without-dynamic-config>

In before, the command outputs empty body for non-existent group without dynamic config like:

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name DOESNOTEXIST
Dynamic configs for group DOESNOTEXIST are:

In this KIP, the command let users know the group doesn’t exist and doesn’t have dynamic config.

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name DOESNOTEXIST
The group DOESNOTEXIST doesn't exist and doesn't have dynamic config.

—describe —entity-type client-metrics —entity-name <non-existent-client-metrics>

In before, the command outputs empty body for non-existent client-metrics like:

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type client-metrics --entity-name DOESNOTEXIST
Dynamic configs for client-metric DOESNOTEXIST are:

In this KIP, the command let users know the client-metrics doesn’t exist.

> ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type client-metrics --entity-name DOESNOTEXIST
The client-metric DOESNOTEXIST doesn't exist and doesn't have dynamic config.

kafka-client-metrics.sh

—describe —name <non-existent-client-metrics>

In before, the command outputs default config for a non-existent client-metric like:

> ./bin/kafka-client-metrics.sh --bootstrap-server localhost:9092 --describe --name DOESNOTEXIST
Client metrics configs for DOESNOTEXIST are:
  interval.ms=300000
  match=
  metrics=

In this KIP, the command let users know the client-metrics doesn't exist.

> ./bin/kafka-client-metrics.sh --bootstrap-server localhost:9092 --describe --name DOESNOTEXIST
The client-metric DOESNOTEXIST doesn't exist and doesn't have dynamic config.


Compatibility, Deprecation, and Migration Plan

RPC

Bumps the version of ListConfigResourcesRequest to avoid any version incompatibility issues with older broker.

The v0 ListConfigResourcesRequest is handled as ListClientMetricsResourcesRequest. It lists all client metrics resources.

The v1 ListConfigResourcesRequest returns all types of configuration resources by default. If there is specified resource types, only returning matched configuration resources.

Admin

The listClientMetricsResources sends v0 ListConfigResourcesRequest. The listConfigResources sends v1 ListConfigResourcesRequest. If user sends v1 ListConfigResourcesRequest to an old broker which only supports v0 request. The request falls back to v0 if there is only client metrics resource type in request body. If resource type is empty or there is other resource types, it returns UNSUPPORTED_VERSION to the user.

Metrics

Before 4.1, the metric records request → ListClientMetricsResources. After this KIP, there is no ListClientMetricsResources. To be compatible with old versions, the metric should record both ListClientMetricsResources and ListConfigResources when the version is 0. For version above 1, it just record ListConfigResources.

Test Plan

ListConfigResourcesRequest integration tests

  • Validate v0 request can get expected result like ListClientMetricsResourcesRequest.
  • Validate v1 request can get expected result with different resource types.

kafka-configs.sh integration tests

  • If there is a non-existent group with dynamic config, the command can show it without specified the entity name.

  • Describing a non-existent group without dynamic config shows the group doesn’t exist.

  • Describing a non-existent client-metric shows the client-metric doesn’t exist.

kafka-client-metrics.sh integration tests

  • Describing a non-existent client-metric shows the client-metric doesn’t exist.

Rejected Alternatives

N/A

  • No labels