Status

Current state: Accepted

Discussion thread: here, vote thread

JIRA: KAFKA-17750 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-848: The Next Generation of the Consumer Rebalance Protocol introduces new status in consumer group like group epoch, target assignment epoch, member epoch, and target assignment. Adding this information to kafka-consumer-groups.sh and kafka-share-groups.sh commands can help administrator to have a detailed view of the state of the groups.

Public Interfaces

Client API changes

Admin

ConsumerGroupDescription

Add new fields groupEpoch and targetAssignmentEpoch. The ConsumerGroupDescription can be used for classic and consumer group. For classic group, there is no epoch information, so using Optional for new fields.

ConsumerGroupDescription
public class ConsumerGroupDescription {
    private final String groupId;
    private final boolean isSimpleConsumerGroup;
    private final Collection<MemberDescription> members;
    private final String partitionAssignor;
    private final GroupType type;
    private final ConsumerGroupState state;
    private final Node coordinator;
    private final Set<AclOperation> authorizedOperations;
	private final Optional<Integer> groupEpoch;
    private final Optional<Integer> targetAssignmentEpoch;

	// ...

	/**
	 * The epoch of the consumer group.
	 */
	public Optional<Integer> groupEpoch() {
		return groupEpoch;
	}

	/**
	 * The epoch of the target assignment.
	 */
	public Optional<Integer> targetAssignmentEpoch() {
		return targetAssignmentEpoch;
	}
}

ShareGroupDescription

Add new fields groupEpoch and targetAssignmentEpoch.

ShareGroupDescription
public class ShareGroupDescription {
    private final String groupId;
    private final Collection<MemberDescription> members;
    private final ShareGroupState state;
    private final Node coordinator;
    private final Set<AclOperation> authorizedOperations;
	private final int groupEpoch;
    private final int targetAssignmentEpoch;

 	// ...

	/**
	 * The epoch of the share group.
	 */
	public int groupEpoch() {
		return groupEpoch;
	}

	/**
	 * The epoch of the target assignment.
	 */
	public int targetAssignmentEpoch() {
		return targetAssignmentEpoch;
	}
}


MemberDescription

Add new fields memberEpoch and upgraded. For classic group member, there is no member epoch, so using Optional for the new field. For upgraded field, the value is like following:

  • Classic Group -> Optional.empty
  • Consumer Group -> Optional.empty if unknown
  • Consumer Group -> Optional.of(false) if classic
  • Consumer Group -> Optional.of(true) if consumer

Deprecated old constructors.

MemberDescription
public class MemberDescription {
    private final String memberId;
    private final Optional<String> groupInstanceId;
    private final String clientId;
    private final String host;
    private final MemberAssignment assignment;
    private final Optional<MemberAssignment> targetAssignment;
    private final Optional<Integer> memberEpoch;
    private final Optional<Boolean> upgraded;

	public MemberDescription(String memberId,
                             Optional<String> groupInstanceId,
                             String clientId,
                             String host,
                             MemberAssignment assignment,
                             Optional<MemberAssignment> targetAssignment,
                             Optional<Integer> memberEpoch,
                             Optional<Boolean> upgraded
    ) {
        this.memberId = memberId == null ? "" : memberId;
        this.groupInstanceId = groupInstanceId;
        this.clientId = clientId == null ? "" : clientId;
        this.host = host == null ? "" : host;
        this.assignment = assignment == null ?
            new MemberAssignment(Collections.emptySet()) : assignment;
        this.targetAssignment = targetAssignment;
        this.memberEpoch = memberEpoch;
        this.upgraded = upgraded;
    }

    /**
     * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
     */
    @Deprecated
    public MemberDescription(String memberId,
                             Optional<String> groupInstanceId,
                             String clientId,
                             String host,
                             MemberAssignment assignment,
                             Optional<MemberAssignment> targetAssignment
    ) {
        this(
            memberId,
            groupInstanceId,
            clientId,
            host,
            assignment,
            targetAssignment,
            Optional.empty(),
            Optional.empty()
        );
    }

    /**
     * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
     */
    @Deprecated
    public MemberDescription(
        String memberId,
        Optional<String> groupInstanceId,
        String clientId,
        String host,
        MemberAssignment assignment
    ) {
        this(
            memberId,
            groupInstanceId,
            clientId,
            host,
            assignment,
            Optional.empty()
        );
    }

    /**
     * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, String, String, MemberAssignment, Optional, Optional, Optional)}.
     */
    @Deprecated
    public MemberDescription(String memberId,
                             String clientId,
                             String host,
                             MemberAssignment assignment) {
        this(memberId, Optional.empty(), clientId, host, assignment);
    }

	// ...

	/**
	 * The epoch of the group member.
	 */
	public Optional<Integer> memberEpoch() {
		return memberEpoch;
	}

    /**
     * The flag indicating whether a member is classic.
     */
    public Optional<Boolean> upgraded() {
        return upgraded;
    }
}

RPCs

ConsumerGroupDescribeRequest

Bump validVersions to "0-1".

ConsumerGroupDescribeRequest
{
  "apiKey": 69,
  "type": "request",
  "listeners": ["broker"],
  "name": "ConsumerGroupDescribeRequest",
  "validVersions": "0-1",  <-- bump to 0-1 
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}


ConsumerGroupDescribeResponse

Bump validVersions to "0-1" and add a new field "MemberType" to "Members". The "MemberType" field is int8. It's -1 as default for unknown, 0 for classic member, and 1 for consumer member.

ConsumerGroupDescribeResponse
{
  "apiKey": 69,
  "type": "response",
  "name": "ConsumerGroupDescribeResponse",
  "validVersions": "0-1",  <-- bump to 0-1
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member instance ID." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rack ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The current member epoch." },
            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "the subscribed topic regex otherwise or null of not provided." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." },
            { "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
              "about": "The target assignment." },
            { "name": "MemberType", "type": "int8", "versions": "1+", "default": "-1", "ignorable": true,
              "about": "-1 for unknown. 0 for classic member. +1 for consumer member." }  <-- new field
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
        "about": "The assigned topic-partitions to the member." }
    ]}
  ]
}

Proposed Changes

kafka-consumer-groups.sh

--describe --state --verbose

Show the group level information. Add GROUP-EPOCH and TARGET-ASSIGNMENT-EPOCH. If the value is empty, showing "-" instead.

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state --verbose
GROUP		COORDINATOR (ID)		ASSIGNMENT-STRATEGY		STATE       GROUP-EPOCH		TARGET-ASSIGNMENT-EPOCH		#MEMBERS
my-group	localhost:61316  (0)	uniform					Stable		3				3							2

--describe --members --verbose

Show the member level information. Add CURRENT-EPOCH, TARGET-EPOCH, TARGET-ASSIGNMENT, and UPGRADED information. The CURRENT-EPOCH means member epoch. Change ASSIGNMENT to CURRENT-ASSIGNMENT. For both classic and consumer groups, the ASSIGNMENT value format will change from "(0,1), (0,1)" to "my_topic:0,1;new_topic:0,1". The original format only shows sorted partition numbers for each topic. The new format adds the topic name. The UPGRADED is showed when a group is in migration.

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
GROUP			CONSUMER-ID				HOST			CLIENT-ID			#PARTITIONS		CURRENT-EPOCH	CURRENT-ASSIGNMENT			TARGET-EPOCH	TARGET-ASSIGNMENT			UPGRADED
my-group		T4tbGKsvT7CtsxVBH5J2QQ	/127.0.0.1		consumer-member		3				3				my_topic:0,1;new_topic:0	3				my_topic:0,1;new_topic:0	true
my-group		6mfoIHq4n3BT7n1HCdqihb	/127.0.0.1		classic-member		1				2				-							3				new_topic:1					false

Following table lists UPGRADED meaning.

UPGRADEDMeaning
"true"

A consumer member in a consumer group.

"false"A classic member in a consumer group.

--describe --offsets --verbose / --describe --verbose

Show leader epoch which was introduced in KIP-320.

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --offsets --verbose
GROUP		TOPIC		PARTITION	LEADER-EPOCH	CURRENT-OFFSET	LOG-END-OFFSET	LAG		CONSUMER-ID				HOST			CLIENT-ID
my-group	my_topic	0			1				10				10				0		T4tbGKsvT7CtsxVBH5J2QQ	/127.0.0.1		consumer-member
my-group	my_topic	1			1				10				10				0		T4tbGKsvT7CtsxVBH5J2QQ	/127.0.0.1		consumer-member
my-group	new_topic	0			1				10				10				0		T4tbGKsvT7CtsxVBH5J2QQ	/127.0.0.1		consumer-member
my-group	new_topic	1			1				0				10				10		6mfoIHq4n3BT7n1HCdqihb	/127.0.0.1		classic-member

kafka-share-groups.sh

Add a new "–verbose" option to ShareGroupCommandOptions.

--describe --state --verbose

Show the group level information. Add GROUP-EPOCH and ASSIGNMENT-EPOCH.

$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state --verbose
GROUP		COORDINATOR (ID)		STATE	GROUP-EPOCH		ASSIGNMENT-EPOCH		#MEMBERS
my-group	localhost:61316 (0)		Stable	1				1						1

--describe --members --verbose

Show the member level information. Add MEMBER-EPOCH and ASSIGNMENT information. The ASSIGNMENT value format will change from "my_topic:0,my_topic:1,new_topic:0,new_topic:1" to "my_topic:0,1;new_topic:0,1". The original format shows topic name for each partition. The new format only shows topic name one time for all partitions under the same topic.

$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
GROUP		CONSUMER-ID				HOST			CLIENT-ID				MEMBER-EPOCH	ASSIGNMENT
my-group	T4tbGKsvT7CtsxVBH5J2QQ	/127.0.0.1		consumer-my-group-1		1				my_topic:0,1;new_topic:0,1

Compatibility, Deprecation, and Migration Plan

This proposal adds new optional fields to Admin client and kafka-consumer-groups.sh, so classic consumer groups will be unaffected.

Test Plan

The feature will be thoroughly tested with unit and integration tests.

Rejected Alternatives

N/A

  • No labels