Status
Current state: Accepted
Discussion thread: here
JIRA: https://issues.apache.org/jira/browse/KAFKA-16891
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The behavior of groups in Apache Kafka is more complicated and subtle than it first appears. To most users of Kafka, groups are synonymous with consumer groups. However, the "classic" consumer group protocol was extensible and there are several well-known extensions in use. For example, distributed workers in Kafka Connect also use groups as a coordination mechanism, and some applications such as schema registries have also built upon the consumer group protocol in interesting ways. These are all groups.
KIP-848 introduced the new consumer group protocol and modern consumer groups use this new protocol. KIP-932 introduces share groups, KIP-1071 introduces streams groups, and there may well be additional types of group in the future.
All of these types of groups share a namespace for group IDs, but the manner in which you administer a group and the operations you can perform upon it depend upon its type.
Here’s an example of the complexity. If you start up a distributed Kafka Connect worker using the default configuration, it creates a group called "connect-cluster"
. This is a group, but it’s not a consumer group. You can’t see this group in the list of consumer groups with the kafka-consumer-groups.sh
tool, but if you try to describe a consumer group called "connect-cluster"
or even use this group ID with a consumer, you get an error.
Here's another example. If you use kafka-consumer-groups.sh --describe --group MYSHARE
where the group is a share group, the output is Error: Consumer group 'MYSHARE' does not exist
. That's probably OK, but it's interesting to see how it gets there.
First, the admin client uses the ConsumerGroupDescribe RPC which responds with error code GROUP_ID_NOT_FOUND (69)
giving no indication that it found a group of the wrong type. Next, the admin client falls back to the pre-KIP-848 DescribeGroups RPC in case it's a classic consumer group. This RPC succeeds and responds with error code NONE (0)
returning the group description with a status of Dead
. It looks like a dead consumer group. Finally, this dead group is translated into the error message Error: Consumer group 'MYSHARE' does not exist
. The output seems kind of acceptable, but the tool actually thinks it's dealing with a dead consumer group. It would be better if the ConsumerGroupDescribe RPC failed in a straightforward way.
This KIP tries to resolve some of these situations and make it easier to work out what’s going on with the groups on a cluster.
Proposed Changes
This KIP introduces a command-line tool called kafka-groups.sh
for displaying all of the groups and their types.
In situations where command-line tools are used to administer a group of the wrong type, the error message now indicates when the group type is wrong, rather than saying the group does not exist. This is achieved using a new error message in the Kafka protocol and a slight change in behavior of the admin client.
Listing groups
The KIP introduces a new tool called kafka-groups.sh
to show all of the groups in a cluster, their types and the protocols they use. This lets you see consumer groups, share groups, Kafka Connect cluster groups, and any other custom groups all together. It doesn't replace the specific tools for the different types of group, but it does shed light on what's actually going on for administrators. Note that this does not require any changes to the Kafka protocol. The information is already available, but not directly accessible by the administrator.
The ListGroups RPC response returns three pieces of information for each group: group ID, type and protocol. For the common types of group, here is what they mean:
Type | Protocol | Meaning |
---|---|---|
Classic |
| Consumer group with the "classic" consumer group protocol |
Classic | "" | "Simple" consumer group that has committed offsets only |
Consumer |
| Consumer group with the KIP-848 consumer group protocol |
Share |
| Share group |
Classic |
| Kafka Connect distributed worker cluster group |
Classic | Any other string | Other customization of "classic" consumer group protocol, such as a schema registry |
The new kafka-groups.sh
tool makes all of this information available.
Describing groups using the admin client
The error message of the exception GroupIdNotFoundException
is used by Admin.describeConsumerGroups(Collection<String>)
and Admin.describeShareGroups(Collection<String>)
to indicate that the group being described has the wrong type. This small change enables the error messages from the command line kafka-consumer-groups.sh
and kafka-share-groups.sh
to indicate when the group ID is found, but it has the wrong type.
Unified group state
The Admin client interfaces to list groups provide the ability to filter by group state. By introducing a method for listing groups of all types, the question arises how to handle group state. Previously, there were separate enums for ConsumerGroupState
and ShareGroupState
. Actually, ConsumerGroupState
contains the set of states for both classic and modern consumer groups, which overlap but are not quite the same. ShareGroupState
is a subset of the states in ConsumerGroupState
.
This KIP introduces a single enum GroupState
which contains all of the states from existing enums. In practice, that means the states are exactly the same as those in ConsumerGroupState
.
Then, ConsumerGroupState
is deprecated in favour of GroupState
, and ShareGroupState
is immediately replaced by GroupState
because the KIP-932 interfaces have not actually been released yet.
The methods which return ConsumerGroupState
will be deprecated and replaced with methods called groupState()
which return GroupState
instead.
Public Interfaces
Client API changes
Admin
Add the following methods on the org.apache.kafka.client.admin.Admin
interface.
Method signature | Description |
---|---|
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds) | Describe some classic groups in the cluster, with the default options. |
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) | Describe some classic groups in the cluster. |
ListGroupsResult listGroups() | List the groups available in the cluster. |
ListGroupsResult listGroups(ListGroupsOptions options) | List the groups available in the cluster. |
It also deprecates the following methods because listGroups
with a type filter is preferred for listing groups rather than adding separate methods for each group type over time. The equivalent share groups methods will be removed from KIP-932.
Method signature | Description |
---|---|
ListConsumerGroupsResult listConsumerGroups() | List the consumer groups available in the cluster, with the default options. |
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) | List the consumer groups in the cluster. |
Also deprecated are the related classes such as ListConsumerGroupsResult
and ListConsumerGroupsOptions
.
Here are the method signatures of the new methods:
/** * Describe some classic groups in the cluster. * * @param groupIds The IDs of the groups to describe. * @param options The options to use when describing the groups. * @return The DescribeClassicGroupsResult. */ DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options); /** * Describe some classic groups in the cluster, with the default options. * <p> * This is a convenience method for {@link #describeClassicGroups(Collection, DescribeClassicGroupsOptions)} * with default options. See the overload for more details. * * @param groupIds The IDs of the groups to describe. * @return The DescribeClassicGroupsResult. */ default DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds) { return describeClassicGroups(groupIds, new DescribeClassicGroupsOptions()); } /** * List the groups available in the cluster with the default options. * * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options. * See the overload for more details. * * @return The ListGroupsResult. */ default ListGroupsResult listGroups() { return listGroups(new ListGroupsOptions()); } /** * List the groups available in the cluster. * * @param options The options to use when listing the groups. * @return The ListGroupsResult. */ ListGroupsResult listGroups(ListGroupsOptions options);
DescribeClassicGroupsOptions
/** * Options for {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}. * <p> * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class DescribeClassicGroupsOptions extends AbstractOptions<DescribeClassicGroupsOptions> { private boolean includeAuthorizedOperations; public DescribeClassicGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) { this.includeAuthorizedOperations = includeAuthorizedOperations; return this; } public boolean includeAuthorizedOperations() { return includeAuthorizedOperations; } }
DescribeClassicGroupsResult
package org.apache.kafka.client.admin; /** * The result of the {@link KafkaAdminClient#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}} call. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class DescribeClassicGroupsResult { public DescribeClassicGroupsResult(final Map<String, KafkaFuture<ClassicGroupDescription>> futures); /** * Return a map from group id to futures which yield group descriptions. */ public Map<String, KafkaFuture<ClassicGroupDescription>> describedGroups(); /** * Return a future which yields all ClassicGroupDescription objects, if all the describes succeed. */ public KafkaFuture<Map<String, ClassicGroupDescription>> all(); }
ClassicGroupDescription
package org.apache.kafka.client.admin; /** * A detailed description of a single classic group in the cluster. */ @InterfaceStability.Evolving public class ClassicGroupDescription { public ClassicGroupDescription(String groupId, String protocol, Collection<MemberDescription> members, String partitionAssignor, GroupState groupState, Node coordinator); public ClassicGroupDescription(String groupId, String protocol, Collection<MemberDescription> members, String partitionAssignor, GroupState groupState, Node coordinator, Set<AclOperation> authorizedOperations); /** * The id of the classic group. */ public String groupId(); /** * The group protocol type. */ public String protocol(); /** * If the group is a simple consumer group or not. */ public boolean isSimpleConsumerGroup(); /** * A list of the members of the classic group. */ public Collection<MemberDescription> members(); /** * The group partition assignor. */ public String partitionAssignor(); /** * The group state, or UNKNOWN if the state is too new for us to parse. */ public GroupState groupState(); /** * The group coordinator, or null if the coordinator is not known. */ public Node coordinator(); /** * authorizedOperations for this group, or null if that information is not known. */ public Set<AclOperation> authorizedOperations(); }
ConsumerGroupDescription
The existing constructors are deprecated and equivalents which use GroupState
instead of ConsumerGroupState
are added.
The method ConsumerGroupState state()
is deprecated and GroupState groupState()
is added.
ListGroupsOptions
package org.apache.kafka.client.admin; import org.apache.kafka.common.GroupType; /** * Options for {@link Admin#listGroups(ListGroupsOptions)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> { /** * In groupStates is set, only groups in these states will be returned by listGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. */ public ListGroupsOptions inStates(Set<GroupState> groupStates) { this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : new HashSet<>(groupStates); return this; } /** * If types is set, only groups of these types will be returned by listGroups(). * Otherwise, all groups are returned. */ public ListGroupsOptions withTypes(Set<GroupType> types) { this.types = (types == null || types.isEmpty()) ? Collections.emptySet() : new HashSet<>(types); return this; } /** * Returns the list of group states that are requested or empty if not states have been specified. */ public Set<GroupState> groupStates() { return groupStates; } /** * Returns the list of group types that are requested or empty if no types have been specified. */ public Set<GroupType> types() { return types; } }
ListGroupsResult
package org.apache.kafka.clients.admin; /** * The result of the {@link Admin#listGroups(ListGroupsOptions)} call. * <p> * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class ListGroupsResult { ListGroupsResult(KafkaFuture<Collection<Object>> future) { super(future); } /** * Returns a future that yields either an exception, or the full set of group listings. */ public KafkaFuture<Collection<GroupListing>> all() { } /** * Returns a future which yields just the valid listings. */ public KafkaFuture<Collection<GroupListing>> valid() { } /** * Returns a future which yields just the errors which occurred. */ public KafkaFuture<Collection<Throwable>> errors() { } }
GroupListing
package org.apache.kafka.client.admin; /** * A listing of a group in the cluster. * <p> * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class GroupListing { public GroupListing(String groupId, Optional<GroupType> type, String protocol); /** * The id of the group. */ public String groupId(); /** * The group type. */ public Optional<GroupType> type(); /** * The group protocol type. */ public String protocol(); /** * The group state. */ public Optional<GroupState> groupState(); /** * If the group is a simple consumer group or not. */ public boolean isSimpleConsumerGroup(); }
The new GroupState
enum contains the states for all types of groups. The following table shows the correspondence between the group states and types. This table will be included in the javadoc.
State | Classic group | Classic consumer group | Modern consumer group | Share group |
---|---|---|---|---|
UNKNOWN | Yes | Yes | Yes | Yes |
PREPARING_REBALANCE | Yes | Yes | ||
COMPLETING_REBALANCE | Yes | Yes | ||
STABLE | Yes | Yes | Yes | Yes |
DEAD | Yes | Yes | Yes | Yes |
EMPTY | Yes | Yes | Yes | Yes |
ASSIGNING | Yes | |||
RECONCILING | Yes |
package org.apache.kafka.common; /** * The group state. */ public enum GroupState { UNKNOWN("Unknown"), PREPARING_REBALANCE("PreparingRebalance"), COMPLETING_REBALANCE("CompletingRebalance"), STABLE("Stable"), DEAD("Dead"), EMPTY("Empty"), ASSIGNING("Assigning"), RECONCILING("Reconciling"); GroupState(String name); /** * Case-insensitive group state lookup by string name. */ public static GroupState parse(String name); public String toString(); }
Kafka protocol changes
This KIP introduces a new version for the DescribeGroups API.
DescribeGroups API
The KIP introduces version 6. This changes the error behavior so that if a classic group is described and the group ID either refers to a group of a different type, or the group ID is not found, the error code GROUP_ID_NOT_FOUND
is returned. Previously, the error code NONE
was used with a group state of DEAD
.
Request schema
Version 6 is the same as version 5.
{ "apiKey": 15, "type": "request", "listeners": ["zkBroker", "broker"], "name": "DescribeGroupsRequest", // Versions 1 and 2 are the same as version 0. // // Starting in version 3, authorized operations can be requested. // // Starting in version 4, the response will include group.instance.id info for members. // // Version 5 is the first flexible version. // // Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not found (KIP-1043). "validVersions": "0-6", "flexibleVersions": "6+", "fields": [ { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The names of the groups to describe" }, { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+", "about": "Whether to include authorized operations." } ] }
Response schema
Version 6 adds the ErrorMessage
to the DescribedGroup
to enable the reasons for a non-existent group to be understood more clearly.
{ "apiKey": 15, "type": "response", "name": "DescribeGroupsResponse", // Version 1 added throttle time. // // Starting in version 2, on quota violation, brokers send out responses before throttling. // // Starting in version 3, brokers can send authorized operations. // // Starting in version 4, the response will optionally include group.instance.id info for members. // // Version 5 is the first flexible version. // // Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not found (KIP-1043). "validVersions": "0-6", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", "about": "Each described group.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The describe error, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "6+", "nullableVersions": "6+", "default": "null", "about": "The describe error message, or null if there was no error." }, { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group ID string." }, { "name": "GroupState", "type": "string", "versions": "0+", "about": "The group state string, or the empty string." }, { "name": "ProtocolType", "type": "string", "versions": "0+", "about": "The group protocol type, or the empty string." }, // ProtocolData is currently only filled in if the group state is in the Stable state. { "name": "ProtocolData", "type": "string", "versions": "0+", "about": "The group protocol data, or the empty string." }, // N.B. If the group is in the Dead state, the members array will always be empty. { "name": "Members", "type": "[]DescribedGroupMember", "versions": "0+", "about": "The group members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID assigned by the group coordinator." }, { "name": "GroupInstanceId", "type": "string", "versions": "4+", "ignorable": true, "nullableVersions": "4+", "default": "null", "about": "The unique identifier of the consumer instance provided by end user." }, { "name": "ClientId", "type": "string", "versions": "0+", "about": "The client ID used in the member's latest join group request." }, { "name": "ClientHost", "type": "string", "versions": "0+", "about": "The client host." }, // This is currently only provided if the group is in the Stable state. { "name": "MemberMetadata", "type": "bytes", "versions": "0+", "about": "The metadata corresponding to the current group protocol in use." }, // This is currently only provided if the group is in the Stable state. { "name": "MemberAssignment", "type": "bytes", "versions": "0+", "about": "The current assignment provided by the group leader." } ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." } ]} ] }
Command-line tools
kafka-groups.sh
A new tool called kafka-groups.sh
is introduced for listing groups of any kind. It has the following options:
Option | Description |
---|---|
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--command-config <String: command config property file> | Property file containing configs to be passed to Admin Client. |
--consumer | Filters the groups to show all kinds of consumer groups, including classic and simple consumer groups. This matches group type 'consumer', and group type 'classic' where the protocol type is 'consumer' or empty. |
--group-type <String: type> | Filters the groups based on group type. Valid types are: 'classic', 'consumer' and 'share'. |
--help | Print usage information. |
--list | List all groups. |
--protocol <String: protocol> | Filters the groups based on protocol type. |
--share | Filters the groups to show share groups. |
--version | Display Kafka version. |
Note that --consumer
actually matches all groups whose type is Consumer , and groups whose type is Classic and protocol type is "consumer"
, and also "simple" consumer groups whose type is Classic and protocol type is ""
. The filtering is done in the kafka-groups.sh
tool.
Here are some examples.
To list all of the groups:
$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list GROUP TYPE PROTOCOL old-consumer-group Classic consumer new-consumer-group Consumer consumer connect-cluster Classic connect share-group Share share schema-registry Classic sr simple-consumer-group Classic
To list all of the consumer groups, silently merging together all of the different kinds of consumer group:
$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list --consumer GROUP TYPE PROTOCOL old-consumer-group Classic consumer new-consumer-group Consumer consumer simple-consumer-group Classic
To list all of the KIP-848 consumer groups:
$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list --group-type consumer GROUP TYPE PROTOCOL new-consumer-group Consumer consumer
To list all of the share groups:
$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list --share GROUP TYPE PROTOCOL share-group Share share
kafka-consumer-groups.sh
For all operations which act on a single group, if that group exists but is not a consumer group, the command fails with a message indicating that the group type is incorrect, rather than the existing message that the group does not exist.
For example, if you try to describe a share group, the output will look like this:
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group SG1 Error: Group 'SG1' is not a consumer group.
kafka-share-groups.sh
For all operations which act on a single group, if that group exists but is not a share group, the command fails with a message indicating that the group type is incorrect, rather than the existing message that the group does not exist.
For example, if you try to describe a consumer group, the output will look like this:
$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group CG1 Error: Group 'CG1' is not a share group.
Compatibility, Deprecation, and Migration Plan
ListGroups RPC
When writing this KIP, it seemed that perhaps the ListGroups RPC would need to be enhanced to create the Admin.listGroups()
method.
ListGroups v5 introduces the TypesFilter
in the request and the GroupType
in the response. This KIP does not need to introduce a new version of the ListGroups RPC, because:
- If the broker does not support ListGroups v5, setting a types filter using
ListGroupOptions.withTypes()
results inUnsupportedVersionException
when the ListGroups request is serialized. (This is KIP-848 behavior.) - If the broker receives a group type in
TypesFilter
that it does not support in a ListGroups v5 (or later) request, the filter is treated as unknown group type which thus matches no groups. (This is KIP-848 behavior.) - If the client receives a group type that it does not understand in a ListGroups v5 (or later) response, the group type is
GroupType.UNKNOWN
. This is because the group type in the ListGroups RPC response cannot be parsed into a value of theGroupType
enumeration, and the default value ofUNKNOWN
is used in these situations.
The existing behavior suffices and there is no compatibility problem.
DescribeConsumerGroups
Prior to this KIP, the behavior of Admin.describeConsumerGroups(Collection<String>)
seems a little unusual when used with groups which are not consumer groups. You can describe a collection of group IDs, some of which might exist and others might not. Here's how the response is built:
- If the group is a consumer group and the client is authorized to describe the group and there was no error, the group information is returned, along with the authorized operations if requested.
- If the group is not a consumer group (either does not exist or wrong type) and the client is authorized to describe the group and there was no error, the group information for a dead group is returned, along with the authorized operations if requested.
- If the client is not authorized to the describe the group, the group information error code is set to
GROUP_AUTHORIZATION_FAILED
. - If there was an error describing the group, the group information error code is set.
In cases (1) and (2), the admin client considers the operation a success, and this means the KafkaFuture
for this group completes successfully. In cases (3) and (4), the admin client considers the operation unsuccessful, and this means the KafkaFuture
for this group completes exceptionally. Case (2) is the tricky one because you can't readily tell what the dead group means. This is why the admin tools use output like Error: Consumer group 'MYSHARE" does not exist
, even when the group ID is recognised and it's just the wrong type.
After this KIP, if you use Admin.describeConsumerGroup(Collection<String>)
to describe a group which is not a consumer group, case (2) above will result in the group information error code set to GROUP_ID_NOT_FOUND
. In the admin client, this means the future for this group completes exceptionally with GroupIdNotFoundException
rather than succeeding with a dead consumer group.
Test Plan
The feature will be thoroughly tested with unit and integration tests.
Rejected Alternatives
It would be possible to preserve the current behavior of Admin.describeConsumerGroups(Collection<String>)
when used with a group which is not a consumer group and introduce an option to ask it to validate the group type rather than converting any indescribable group into a dead consumer group. This seems like an unnecessary complication with little benefit.
It was proposed to add a new error code INCONSISTENT_GROUP_TYPE
to indicate that the group existed but the group type was inconsistent with the operation. Instead, the existing error code GROUP_ID_NOT_FOUND
is used and the protocol is enhanced so that an error message is returned along with this error code.