Status

Current stateAccepted

Discussion thread: here

JIRA: https://issues.apache.org/jira/browse/KAFKA-16891

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The behavior of groups in Apache Kafka is more complicated and subtle than it first appears. To most users of Kafka, groups are synonymous with consumer groups. However, the "classic" consumer group protocol was extensible and there are several well-known extensions in use. For example, distributed workers in Kafka Connect also use groups as a coordination mechanism, and some applications such as schema registries have also built upon the consumer group protocol in interesting ways. These are all groups.

KIP-848 introduced the new consumer group protocol and modern consumer groups use this new protocol. KIP-932 introduces share groups, KIP-1071 introduces streams groups, and there may well be additional types of group in the future.

All of these types of groups share a namespace for group IDs, but the manner in which you administer a group and the operations you can perform upon it depend upon its type.

Here’s an example of the complexity. If you start up a distributed Kafka Connect worker using the default configuration, it creates a group called "connect-cluster" . This is a group, but it’s not a consumer group. You can’t see this group in the list of consumer groups with the kafka-consumer-groups.sh  tool, but if you try to describe a consumer group called "connect-cluster"  or even use this group ID with a consumer, you get an error.

Here's another example. If you use kafka-consumer-groups.sh --describe --group MYSHARE  where the group is a share group, the output is Error: Consumer group 'MYSHARE' does not exist . That's probably OK, but it's interesting to see how it gets there.

First, the admin client uses the ConsumerGroupDescribe RPC which responds with error code GROUP_ID_NOT_FOUND (69)  giving no indication that it found a group of the wrong type. Next, the admin client falls back to the pre-KIP-848 DescribeGroups RPC in case it's a classic consumer group.  This RPC succeeds and responds with error code NONE (0)  returning the group description with a status of Dead . It looks like a dead consumer group. Finally, this dead group is translated into the error message Error: Consumer group 'MYSHARE' does not exist . The output seems kind of acceptable, but the tool actually thinks it's dealing with a dead consumer group. It would be better if the ConsumerGroupDescribe RPC failed in a straightforward way.

This KIP tries to resolve some of these situations and make it easier to work out what’s going on with the groups on a cluster.

Proposed Changes

This KIP introduces a command-line tool called kafka-groups.sh  for displaying all of the groups and their types.

In situations where command-line tools are used to administer a group of the wrong type, the error message now indicates when the group type is wrong, rather than saying the group does not exist. This is achieved using a new error message in the Kafka protocol and a slight change in behavior of the admin client.

Listing groups

The KIP introduces a new tool called kafka-groups.sh to show all of the groups in a cluster, their types and the protocols they use. This lets you see consumer groups, share groups, Kafka Connect cluster groups, and any other custom groups all together. It doesn't replace the specific tools for the different types of group, but it does shed light on what's actually going on for administrators. Note that this does not require any changes to the Kafka protocol. The information is already available, but not directly accessible by the administrator.

The ListGroups RPC response returns three pieces of information for each group: group ID, type and protocol. For the common types of group, here is what they mean:

Type

Protocol

Meaning

Classic

"consumer" 

Consumer group with the "classic" consumer group protocol

Classic

""

"Simple" consumer group that has committed offsets only

Consumer

"consumer" 

Consumer group with the KIP-848 consumer group protocol

Share

"share" 

Share group

Classic

"connect" 

Kafka Connect distributed worker cluster group

Classic

Any other string

Other customization of "classic" consumer group protocol, such as a schema registry

The new kafka-groups.sh tool makes all of this information available.

Describing groups using the admin client

The error message of the exception GroupIdNotFoundException  is used by Admin.describeConsumerGroups(Collection<String>) and Admin.describeShareGroups(Collection<String>)  to indicate that the group being described has the wrong type. This small change enables the error messages from the command line kafka-consumer-groups.sh  and kafka-share-groups.sh  to indicate when the group ID is found, but it has the wrong type.

Unified group state

The Admin client interfaces to list groups provide the ability to filter by group state. By introducing a method for listing groups of all types, the question arises how to handle group state. Previously, there were separate enums for ConsumerGroupState  and ShareGroupState . Actually, ConsumerGroupState contains the set of states for both classic and modern consumer groups, which overlap but are not quite the same. ShareGroupState  is a subset of the states in ConsumerGroupState

This KIP introduces a single enum GroupState  which contains all of the states from existing enums. In practice, that means the states are exactly the same as those in ConsumerGroupState .

Then, ConsumerGroupState  is deprecated in favour of GroupState , and ShareGroupState  is immediately replaced by GroupState because the KIP-932 interfaces have not actually been released yet.

The methods which return ConsumerGroupState will be deprecated and replaced with methods called groupState()  which return GroupState  instead.

Public Interfaces

Client API changes

Admin

Add the following methods on the org.apache.kafka.client.admin.Admin  interface.

Method signatureDescription
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds) Describe some classic groups in the cluster, with the default options.
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) Describe some classic groups in the cluster.
ListGroupsResult listGroups() List the groups available in the cluster.
ListGroupsResult listGroups(ListGroupsOptions options) List the groups available in the cluster.

It also deprecates the following methods because listGroups  with a type filter is preferred for listing groups rather than adding separate methods for each group type over time. The equivalent share groups methods will be removed from KIP-932.

Method signatureDescription
ListConsumerGroupsResult listConsumerGroups() List the consumer groups available in the cluster, with the default options.
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) List the consumer groups in the cluster.

Also deprecated are the related classes such as ListConsumerGroupsResult  and ListConsumerGroupsOptions .

Here are the method signatures of the new methods:

   /**
    * Describe some classic groups in the cluster.
    *
    * @param groupIds The IDs of the groups to describe.
    * @param options  The options to use when describing the groups.
    * @return The DescribeClassicGroupsResult.
    */
   DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds,
                                                     DescribeClassicGroupsOptions options);

   /**
    * Describe some classic groups in the cluster, with the default options.
    * <p>
    * This is a convenience method for {@link #describeClassicGroups(Collection, DescribeClassicGroupsOptions)}
    * with default options. See the overload for more details.
    *
    * @param groupIds The IDs of the groups to describe.
    * @return The DescribeClassicGroupsResult.
    */
   default DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds) {
       return describeClassicGroups(groupIds, new DescribeClassicGroupsOptions());
   }

   /**
    * List the groups available in the cluster with the default options.
    *
    * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
    * See the overload for more details.
    *
    * @return The ListGroupsResult.
    */
   default ListGroupsResult listGroups() {
       return listGroups(new ListGroupsOptions());
   }
 
   /**
    * List the groups available in the cluster.
    *
    * @param options The options to use when listing the groups.
    * @return The ListGroupsResult.
    */
   ListGroupsResult listGroups(ListGroupsOptions options);

DescribeClassicGroupsOptions

/**
 * Options for {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeClassicGroupsOptions extends AbstractOptions<DescribeClassicGroupsOptions> {
    private boolean includeAuthorizedOperations;

    public DescribeClassicGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
        this.includeAuthorizedOperations = includeAuthorizedOperations;
        return this;
    }

    public boolean includeAuthorizedOperations() {
        return includeAuthorizedOperations;
    }
}

DescribeClassicGroupsResult

package org.apache.kafka.client.admin;

/**
 * The result of the {@link KafkaAdminClient#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeClassicGroupsResult {

    public DescribeClassicGroupsResult(final Map<String, KafkaFuture<ClassicGroupDescription>> futures);

    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<ClassicGroupDescription>> describedGroups();

    /**
     * Return a future which yields all ClassicGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, ClassicGroupDescription>> all();
}

ClassicGroupDescription

package org.apache.kafka.client.admin;

/**
 * A detailed description of a single classic group in the cluster.
 */
@InterfaceStability.Evolving
public class ClassicGroupDescription {
    public ClassicGroupDescription(String groupId,
                                   String protocol,
                                   Collection<MemberDescription> members,
                                   String partitionAssignor,
                                   GroupState groupState,
                                   Node coordinator);

    public ClassicGroupDescription(String groupId,
                                   String protocol,
                                   Collection<MemberDescription> members,
                                   String partitionAssignor,
                                   GroupState groupState,
                                   Node coordinator,
                                   Set<AclOperation> authorizedOperations);

    /**
     * The id of the classic group.
     */
    public String groupId();

    /**
     * The group protocol type.
     */
    public String protocol();

    /**
     * If the group is a simple consumer group or not.
     */
    public boolean isSimpleConsumerGroup();

    /**
     * A list of the members of the classic group.
     */
    public Collection<MemberDescription> members();

    /**
     * The group partition assignor.
     */
    public String partitionAssignor();

    /**
     * The group state, or UNKNOWN if the state is too new for us to parse.
     */
    public GroupState groupState();

    /**
     * The group coordinator, or null if the coordinator is not known.
     */
    public Node coordinator();

    /**
     * authorizedOperations for this group, or null if that information is not known.
     */
    public Set<AclOperation> authorizedOperations();
}

ConsumerGroupDescription

The existing constructors are deprecated and equivalents which use GroupState  instead of ConsumerGroupState  are added.

The method ConsumerGroupState state()  is deprecated and GroupState groupState()  is added.

ListGroupsOptions

package org.apache.kafka.client.admin;
 
import org.apache.kafka.common.GroupType;
 
/**
 * Options for {@link Admin#listGroups(ListGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {

    /**
     * In groupStates is set, only groups in these states will be returned by listGroups().
     * Otherwise, all groups are returned.
     * This operation is supported by brokers with version 2.6.0 or later.
     */
    public ListGroupsOptions inStates(Set<GroupState> groupStates) {
        this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : new HashSet<>(groupStates);
        return this;
    }

    /**
     * If types is set, only groups of these types will be returned by listGroups().
     * Otherwise, all groups are returned.
     */
    public ListGroupsOptions withTypes(Set<GroupType> types) {
        this.types = (types == null || types.isEmpty()) ? Collections.emptySet() : new HashSet<>(types);
        return this;
    }

    /**
     * Returns the list of group states that are requested or empty if not states have been specified.
     */
    public Set<GroupState> groupStates() {
        return groupStates;
    }

    /**
     * Returns the list of group types that are requested or empty if no types have been specified.
     */
    public Set<GroupType> types() {
        return types;
    }
}

ListGroupsResult

package org.apache.kafka.clients.admin;
   
/**
 * The result of the {@link Admin#listGroups(ListGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListGroupsResult {
    ListGroupsResult(KafkaFuture<Collection<Object>> future) {
        super(future);
    }

    /**
     * Returns a future that yields either an exception, or the full set of group listings.
     */
    public KafkaFuture<Collection<GroupListing>> all() {
    }
  
    /**
     * Returns a future which yields just the valid listings.
     */
    public KafkaFuture<Collection<GroupListing>> valid() {
    }
   
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

GroupListing

package org.apache.kafka.client.admin;
  
/**
 * A listing of a group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class GroupListing {
  public GroupListing(String groupId, Optional<GroupType> type, String protocol);

  /**
   * The id of the group.
   */
  public String groupId();
  
  /**
   * The group type.
   */
  public Optional<GroupType> type();
  
  /**
   * The group protocol type.
   */
  public String protocol();

  /**
   * The group state.
   */
  public Optional<GroupState> groupState();

  /**
   * If the group is a simple consumer group or not.
   */
  public boolean isSimpleConsumerGroup();
}

The new GroupState  enum contains the states for all types of groups. The following table shows the correspondence between the group states and types. This table will be included in the javadoc.

StateClassic groupClassic consumer groupModern consumer groupShare group
UNKNOWNYesYesYesYes
PREPARING_REBALANCEYesYes

COMPLETING_REBALANCEYesYes

STABLEYesYesYesYes
DEADYesYesYesYes
EMPTYYesYesYesYes
ASSIGNING

Yes
RECONCILING

Yes
package org.apache.kafka.common;

/**
 * The group state.
 */
public enum GroupState {
    UNKNOWN("Unknown"),
    PREPARING_REBALANCE("PreparingRebalance"),
    COMPLETING_REBALANCE("CompletingRebalance"),
    STABLE("Stable"),
    DEAD("Dead"),
    EMPTY("Empty"),
    ASSIGNING("Assigning"),
    RECONCILING("Reconciling");

    GroupState(String name);

    /**
     * Case-insensitive group state lookup by string name.
     */
    public static GroupState parse(String name);

    public String toString();
}

Kafka protocol changes

This KIP introduces a new version for the DescribeGroups API.

DescribeGroups API

The KIP introduces version 6. This changes the error behavior so that if a classic group is described and the group ID either refers to a group of a different type, or the group ID is not found, the error code GROUP_ID_NOT_FOUND  is returned. Previously, the error code  NONE  was used with a group state of DEAD .

Request schema

Version 6 is the same as version 5.

{
  "apiKey": 15,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "DescribeGroupsRequest",
  // Versions 1 and 2 are the same as version 0.
  //
  // Starting in version 3, authorized operations can be requested.
  //
  // Starting in version 4, the response will include group.instance.id info for members.
  //
  // Version 5 is the first flexible version.
  //
  // Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not found (KIP-1043).
  "validVersions": "0-6",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The names of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
      "about": "Whether to include authorized operations." }
  ]
}

Response schema

Version 6 adds the ErrorMessage  to the DescribedGroup  to enable the reasons for a non-existent group to be understood more clearly.

{
  "apiKey": 15,
  "type": "response",
  "name": "DescribeGroupsResponse",
  // Version 1 added throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Starting in version 3, brokers can send authorized operations.
  //
  // Starting in version 4, the response will optionally include group.instance.id info for members.
  //
  // Version 5 is the first flexible version.
  //
  // Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not found (KIP-1043).
  "validVersions": "0-6",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The describe error, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "6+", "nullableVersions": "6+", "default": "null",
        "about": "The describe error message, or null if there was no error." },
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group ID string." },
      { "name": "GroupState", "type": "string", "versions": "0+",
        "about": "The group state string, or the empty string." },
      { "name": "ProtocolType", "type": "string", "versions": "0+",
        "about": "The group protocol type, or the empty string." },
      // ProtocolData is currently only filled in if the group state is in the Stable state.
      { "name": "ProtocolData", "type": "string", "versions": "0+",
        "about": "The group protocol data, or the empty string." },
      // N.B. If the group is in the Dead state, the members array will always be empty.
      { "name": "Members", "type": "[]DescribedGroupMember", "versions": "0+",
        "about": "The group members.", "fields": [
        { "name": "MemberId", "type": "string", "versions": "0+",
          "about": "The member ID assigned by the group coordinator." },
        { "name": "GroupInstanceId", "type": "string", "versions": "4+", "ignorable": true,
          "nullableVersions": "4+", "default": "null",
          "about": "The unique identifier of the consumer instance provided by end user." },
        { "name": "ClientId", "type": "string", "versions": "0+",
          "about": "The client ID used in the member's latest join group request." },
        { "name": "ClientHost", "type": "string", "versions": "0+",
          "about": "The client host." },
        // This is currently only provided if the group is in the Stable state.
        { "name": "MemberMetadata", "type": "bytes", "versions": "0+",
          "about": "The metadata corresponding to the current group protocol in use." },
        // This is currently only provided if the group is in the Stable state.
        { "name": "MemberAssignment", "type": "bytes", "versions": "0+",
          "about": "The current assignment provided by the group leader." }
      ]},
      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+",  "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this group." }
    ]}
  ]
}

Command-line tools

kafka-groups.sh

A new tool called kafka-groups.sh  is introduced for listing groups of any kind. It has the following options:

OptionDescription

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--consumer

Filters the groups to show all kinds of consumer groups, including classic and simple consumer groups. This matches group type 'consumer', and group type 'classic' where the protocol type is 'consumer' or empty.

--group-type <String: type>

Filters the groups based on group type. Valid types are: 'classic', 'consumer' and 'share'.

--help

Print usage information.

--list

List all groups.

--protocol <String: protocol>

Filters the groups based on protocol type.

--share

Filters the groups to show share groups.

--version

Display Kafka version.

Note that --consumer  actually matches all groups whose type is Consumer , and groups whose type is Classic and protocol type is "consumer", and also "simple" consumer groups whose type is Classic and protocol type is "" . The filtering is done in the kafka-groups.sh  tool.

Here are some examples.

To list all of the groups:

$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list
GROUP                   TYPE          PROTOCOL
old-consumer-group      Classic       consumer
new-consumer-group      Consumer      consumer
connect-cluster         Classic       connect
share-group             Share         share
schema-registry         Classic       sr
simple-consumer-group   Classic

To list all of the consumer groups, silently merging together all of the different kinds of consumer group:

$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list --consumer
GROUP                   TYPE          PROTOCOL
old-consumer-group      Classic       consumer
new-consumer-group      Consumer      consumer
simple-consumer-group   Classic

To list all of the KIP-848 consumer groups:

$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list --group-type consumer
GROUP                   TYPE          PROTOCOL
new-consumer-group      Consumer      consumer

To list all of the share groups:

$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list --share
GROUP                   TYPE          PROTOCOL
share-group             Share         share

kafka-consumer-groups.sh

For all operations which act on a single group, if that group exists but is not a consumer group, the command fails with a message indicating that the group type is incorrect, rather than the existing message that the group does not exist.

For example, if you try to describe a share group, the output will look like this:

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group SG1
Error: Group 'SG1' is not a consumer group.

kafka-share-groups.sh

For all operations which act on a single group, if that group exists but is not a share group, the command fails with a message indicating that the group type is incorrect, rather than the existing message that the group does not exist.

For example, if you try to describe a consumer group, the output will look like this:

$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group CG1
Error: Group 'CG1' is not a share group.

Compatibility, Deprecation, and Migration Plan

ListGroups RPC

When writing this KIP, it seemed that perhaps the ListGroups RPC would need to be enhanced to create the Admin.listGroups()  method.

ListGroups v5 introduces the TypesFilter  in the request and the GroupType  in the response. This KIP does not need to introduce a new version of the ListGroups RPC, because:

  • If the broker does not support ListGroups v5, setting a types filter using ListGroupOptions.withTypes() results in UnsupportedVersionException when the ListGroups request is serialized. (This is KIP-848 behavior.)
  • If the broker receives a group type in TypesFilter  that it does not support in a ListGroups v5 (or later) request, the filter is treated as unknown group type which thus matches no groups. (This is KIP-848 behavior.)
  • If the client receives a group type that it does not understand in a ListGroups v5 (or later) response, the group type is GroupType.UNKNOWN . This is because the group type in the ListGroups RPC response cannot be parsed into a value of the GroupType  enumeration, and the default value of UNKNOWN  is used in these situations.

The existing behavior suffices and there is no compatibility problem.

DescribeConsumerGroups

Prior to this KIP, the behavior of Admin.describeConsumerGroups(Collection<String>) seems a little unusual when used with groups which are not consumer groups. You can describe a collection of group IDs, some of which might exist and others might not. Here's how the response is built:

  1. If the group is a consumer group and the client is authorized to describe the group and there was no error, the group information is returned, along with the authorized operations if requested.
  2. If the group is not a consumer group (either does not exist or wrong type) and the client is authorized to describe the group and there was no error, the group information for a dead group is returned, along with the authorized operations if requested.
  3. If the client is not authorized to the describe the group, the group information error code is set to GROUP_AUTHORIZATION_FAILED . 
  4. If there was an error describing the group, the group information error code is set.

In cases (1) and (2), the admin client considers the operation a success, and this means the KafkaFuture for this group completes successfully. In cases (3) and (4), the admin client considers the operation unsuccessful, and this means the KafkaFuture for this group completes exceptionally. Case (2) is the tricky one because you can't readily tell what the dead group means. This is why the admin tools use output like Error: Consumer group 'MYSHARE" does not exist, even when the group ID is recognised and it's just the wrong type.

After this KIP, if you use Admin.describeConsumerGroup(Collection<String>)  to describe a group which is not a consumer group, case (2) above will result in the group information error code set to GROUP_ID_NOT_FOUND . In the admin client, this means the future for this group completes exceptionally with GroupIdNotFoundException  rather than succeeding with a dead consumer group.

Test Plan

The feature will be thoroughly tested with unit and integration tests.

Rejected Alternatives

It would be possible to preserve the current behavior of Admin.describeConsumerGroups(Collection<String>)  when used with a group which is not a consumer group and introduce an option to ask it to validate the group type rather than converting any indescribable group into a dead consumer group. This seems like an unnecessary complication with little benefit.

It was proposed to add a new error code INCONSISTENT_GROUP_TYPE  to indicate that the group existed but the group type was inconsistent with the operation. Instead, the existing error code GROUP_ID_NOT_FOUND  is used and the protocol is enhanced so that an error message is returned along with this error code.

  • No labels