This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-88: OffsetFetch Protocol Update
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Status

Current stateDiscussion

Discussion thread: here

JIRA: KAFKA-3853

Released: <Kafka Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KAFKA-3853 asks for an improvement to the new-consumer option of the consumer group command. This command, when passed a consumer group that has no consumer (i.e., when the group state is Empty), currently reports an error indicating that there is no active member:

$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group1
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Error: Consumer group 'group1' has no active members.

The requested improvement is returning offsets within the group (and leaving the consumer column empty) instead of returning the error message above. The error message can still be printed to to stderr as a warning.

When the group is Stable (i.e. when there are active consumers in the group), {{DescribeGroups Response}} returns the associated topic partition assignment for each member of the group, and that assignment can be used to extract the corresponding committed offset(s). However, if the group state is Empty (i.e. when there are no active consumers in the group) there is no associated topic partition info in {{DescribeGroups Response}}. Therefore, a change in the protocol seems to be the only way to extract associated topic partitions of a group and the corresponding committed offsets.

Public Interfaces

This is the current schema for DescribeGroups (version 0).

DescribeGroups Request (Version: 0) => [group_ids] 
  group_ids => STRING
 
DescribeGroups Response (Version: 0) => [groups] 
  groups => error_code group_id state protocol_type protocol [members] 
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES

As shown above, and as explained in the previous section, corresponding topic partitions can be extracted only through the individual (and active) members of the group. The list members is empty if the group is in Empty state, so with this protocol there is no way to extract offsets of an Empty group (problem 1).

Also, offset information for a Stable group is not stored in the above Response schema. Offsets associated with a Stable consumer group are currently extracted using the committed method of KafkaConsumer and by creating a new and dummy member in the group for the sole purpose of making committed calls on every topic partition of each of the other consumers in the group to extract the corresponding offsets. A cleaner approach would be extracting these offsets without having to create this dummy member (problem 2).

Proposed below is version 1 of the DescribeGroups schema that solves both problems above by including the group offset information directly into the Response schema (lines 17-21 below). Note that topic partitions reported under the offsets field are going to be a superset of topic partitions reported under members:member_assignment.

DescribeGroups Request (Version: 1) => [group_ids] 
  group_ids => STRING
 
DescribeGroups Response (Version: 1) => [groups] 
  groups => error_code group_id state protocol_type protocol [members] [offsets]
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES
    offsets => topic partition offset metadata
      topic => STRING
      partition => INT32
      offset => INT64
      metadata => STRING

Proposed Changes

The proposal is to include a list offsets of tuples in the response to DescribeGroups request. Each of these tuples contains the following fields:

  • topic: topic from which a previous member of the group consumed.
  • partition: partition of the above topic for which offset information associated with the group exists
  • offset: associated committed offset of the above topic partition within the group
  • metadata: metadata associated with the above offset

Compatibility, Deprecation, and Migration Plan

There should be no impact to existing users of this protocol. The Request schema remains unchanged, and the Response schema is only expanded. There is no change in the existing Response structure, and, therefore, users who use version 0 of the schema should not be affected. The code behind kafka-consumer-groups.sh will also be updated (as requested in the JIRA) to return additional information (rows of topic partitions with no assigned consumer). In case it is necessary to provide the old output as well, a new parameter can be defined for the tool to report the new information; while the old way of running the tool still prints out the old output.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels