Status
Current state: Accepted
Discussion thread: here, vote thread
JIRA:
-
KAFKA-17750Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-848: The Next Generation of the Consumer Rebalance Protocol introduces new status in consumer group like group epoch, target assignment epoch, member epoch, and target assignment. Adding this information to kafka-consumer-groups.sh and kafka-share-groups.sh commands can help administrator to have a detailed view of the state of the groups.
Public Interfaces
Client API changes
Admin
ConsumerGroupDescription
Add new fields groupEpoch and targetAssignmentEpoch. The ConsumerGroupDescription can be used for classic and consumer group. For classic group, there is no epoch information, so using Optional for new fields.
public class ConsumerGroupDescription { private final String groupId; private final boolean isSimpleConsumerGroup; private final Collection<MemberDescription> members; private final String partitionAssignor; private final GroupType type; private final ConsumerGroupState state; private final Node coordinator; private final Set<AclOperation> authorizedOperations; private final Optional<Integer> groupEpoch; private final Optional<Integer> targetAssignmentEpoch; // ... /** * The epoch of the consumer group. */ public Optional<Integer> groupEpoch() { return groupEpoch; } /** * The epoch of the target assignment. */ public Optional<Integer> targetAssignmentEpoch() { return targetAssignmentEpoch; } }
ShareGroupDescription
Add new fields groupEpoch and targetAssignmentEpoch.
public class ShareGroupDescription { private final String groupId; private final Collection<MemberDescription> members; private final ShareGroupState state; private final Node coordinator; private final Set<AclOperation> authorizedOperations; private final int groupEpoch; private final int targetAssignmentEpoch; // ... /** * The epoch of the share group. */ public int groupEpoch() { return groupEpoch; } /** * The epoch of the target assignment. */ public int targetAssignmentEpoch() { return targetAssignmentEpoch; } }
MemberDescription
Add new fields memberEpoch and upgraded. For classic group member, there is no member epoch, so using Optional for the new field. For upgraded field, the value is like following:
- Classic Group -> Optional.empty
- Consumer Group -> Optional.empty if unknown
- Consumer Group -> Optional.of(false) if classic
- Consumer Group -> Optional.of(true) if consumer
Deprecated old constructors.
public class MemberDescription { private final String memberId; private final Optional<String> groupInstanceId; private final String clientId; private final String host; private final MemberAssignment assignment; private final Optional<MemberAssignment> targetAssignment; private final Optional<Integer> memberEpoch; private final Optional<Boolean> upgraded; public MemberDescription(String memberId, Optional<String> groupInstanceId, String clientId, String host, MemberAssignment assignment, Optional<MemberAssignment> targetAssignment, Optional<Integer> memberEpoch, Optional<Boolean> upgraded ) { this.memberId = memberId == null ? "" : memberId; this.groupInstanceId = groupInstanceId; this.clientId = clientId == null ? "" : clientId; this.host = host == null ? "" : host; this.assignment = assignment == null ? new MemberAssignment(Collections.emptySet()) : assignment; this.targetAssignment = targetAssignment; this.memberEpoch = memberEpoch; this.upgraded = upgraded; } /** * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. */ @Deprecated public MemberDescription(String memberId, Optional<String> groupInstanceId, String clientId, String host, MemberAssignment assignment, Optional<MemberAssignment> targetAssignment ) { this( memberId, groupInstanceId, clientId, host, assignment, targetAssignment, Optional.empty(), Optional.empty() ); } /** * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. */ @Deprecated public MemberDescription( String memberId, Optional<String> groupInstanceId, String clientId, String host, MemberAssignment assignment ) { this( memberId, groupInstanceId, clientId, host, assignment, Optional.empty() ); } /** * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}. */ @Deprecated public MemberDescription(String memberId, String clientId, String host, MemberAssignment assignment) { this(memberId, Optional.empty(), clientId, host, assignment); } // ... /** * The epoch of the group member. */ public Optional<Integer> memberEpoch() { return memberEpoch; } /** * The flag indicating whether a member is classic. */ public Optional<Boolean> upgraded() { return upgraded; } }
RPCs
ConsumerGroupDescribeRequest
Bump validVersions to "0-1".
{ "apiKey": 69, "type": "request", "listeners": ["broker"], "name": "ConsumerGroupDescribeRequest", "validVersions": "0-1", <-- bump to 0-1 "flexibleVersions": "0+", "fields": [ { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of the groups to describe" }, { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", "about": "Whether to include authorized operations." } ] }
ConsumerGroupDescribeResponse
Bump validVersions to "0-1" and add a new field "MemberType" to "Members". The "MemberType" field is int8. It's -1 as default for unknown, 0 for classic member, and 1 for consumer member.
{ "apiKey": 69, "type": "response", "name": "ConsumerGroupDescribeResponse", "validVersions": "0-1", <-- bump to 0-1 "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", "about": "Each described group.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The describe error, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group ID string." }, { "name": "GroupState", "type": "string", "versions": "0+", "about": "The group state string, or the empty string." }, { "name": "GroupEpoch", "type": "int32", "versions": "0+", "about": "The group epoch." }, { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", "about": "The assignment epoch." }, { "name": "AssignorName", "type": "string", "versions": "0+", "about": "The selected assignor." }, { "name": "Members", "type": "[]Member", "versions": "0+", "about": "The members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID." }, { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The member instance ID." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The member rack ID." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch." }, { "name": "ClientId", "type": "string", "versions": "0+", "about": "The client ID." }, { "name": "ClientHost", "type": "string", "versions": "0+", "about": "The client host." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName", "about": "The subscribed topic names." }, { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "the subscribed topic regex otherwise or null of not provided." }, { "name": "Assignment", "type": "Assignment", "versions": "0+", "about": "The current assignment." }, { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", "about": "The target assignment." }, { "name": "MemberType", "type": "int8", "versions": "1+", "default": "-1", "ignorable": true, "about": "-1 for unknown. 0 for classic member. +1 for consumer member." } <-- new field ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." } ] } ], "commonStructs": [ { "name": "TopicPartitions", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]}, { "name": "Assignment", "versions": "0+", "fields": [ { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "about": "The assigned topic-partitions to the member." } ]} ] }
Proposed Changes
kafka-consumer-groups.sh
--describe --state --verbose
Show the group level information. Add GROUP-EPOCH and TARGET-ASSIGNMENT-EPOCH. If the value is empty, showing "-" instead.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state --verbose GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE GROUP-EPOCH TARGET-ASSIGNMENT-EPOCH #MEMBERS my-group localhost:61316 (0) uniform Stable 3 3 2
--describe --members --verbose
Show the member level information. Add CURRENT-EPOCH, TARGET-EPOCH, TARGET-ASSIGNMENT, and UPGRADED information. The CURRENT-EPOCH means member epoch. Change ASSIGNMENT to CURRENT-ASSIGNMENT. For both classic and consumer groups, the ASSIGNMENT value format will change from "(0,1), (0,1)" to "my_topic:0,1;new_topic:0,1". The original format only shows sorted partition numbers for each topic. The new format adds the topic name. The UPGRADED is showed when a group is in migration.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS CURRENT-EPOCH CURRENT-ASSIGNMENT TARGET-EPOCH TARGET-ASSIGNMENT UPGRADED my-group T4tbGKsvT7CtsxVBH5J2QQ /127.0.0.1 consumer-member 3 3 my_topic:0,1;new_topic:0 3 my_topic:0,1;new_topic:0 true my-group 6mfoIHq4n3BT7n1HCdqihb /127.0.0.1 classic-member 1 2 - 3 new_topic:1 false
Following table lists UPGRADED meaning.
UPGRADED | Meaning |
---|---|
"true" | A consumer member in a consumer group. |
"false" | A classic member in a consumer group. |
--describe --offsets --verbose / --describe --verbose
Show leader epoch which was introduced in KIP-320.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --offsets --verbose GROUP TOPIC PARTITION LEADER-EPOCH CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-group my_topic 0 1 10 10 0 T4tbGKsvT7CtsxVBH5J2QQ /127.0.0.1 consumer-member my-group my_topic 1 1 10 10 0 T4tbGKsvT7CtsxVBH5J2QQ /127.0.0.1 consumer-member my-group new_topic 0 1 10 10 0 T4tbGKsvT7CtsxVBH5J2QQ /127.0.0.1 consumer-member my-group new_topic 1 1 0 10 10 6mfoIHq4n3BT7n1HCdqihb /127.0.0.1 classic-member
kafka-share-groups.sh
Add a new "–verbose" option to ShareGroupCommandOptions.
--describe --state --verbose
Show the group level information. Add GROUP-EPOCH and ASSIGNMENT-EPOCH.
$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state --verbose GROUP COORDINATOR (ID) STATE GROUP-EPOCH ASSIGNMENT-EPOCH #MEMBERS my-group localhost:61316 (0) Stable 1 1 1
--describe --members --verbose
Show the member level information. Add MEMBER-EPOCH and ASSIGNMENT information. The ASSIGNMENT value format will change from "my_topic:0,my_topic:1,new_topic:0,new_topic:1" to "my_topic:0,1;new_topic:0,1". The original format shows topic name for each partition. The new format only shows topic name one time for all partitions under the same topic.
$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose GROUP CONSUMER-ID HOST CLIENT-ID MEMBER-EPOCH ASSIGNMENT my-group T4tbGKsvT7CtsxVBH5J2QQ /127.0.0.1 consumer-my-group-1 1 my_topic:0,1;new_topic:0,1
Compatibility, Deprecation, and Migration Plan
This proposal adds new optional fields to Admin client and kafka-consumer-groups.sh, so classic consumer groups will be unaffected.
Test Plan
The feature will be thoroughly tested with unit and integration tests.
Rejected Alternatives
N/A