Current state: Discussion
Discussion thread: here
Released: <Kafka Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
KAFKA-3853 asks for an improvement to the
describe option of the consumer group command for new (Java API based) consumers. This command, when passed a consumer group that has no consumer (i.e., when the group state is
Empty), currently reports an error indicating that there is no active member:
The requested improvement is returning offsets within the group (and leaving the consumer column empty) instead of returning the error message above. The error message can still be printed to to
stderr as a warning.
When the group is
Stable (i.e. when there are active consumers in the group), the above command returns the associated topic partition assignment for each member of the group, and that assignment can be used to extract the corresponding committed offset(s). However, if the group state is
Empty (i.e. when there are no active consumers in the group) there is no associated topic partition info in
DescribeGroups Response. Therefore,
DescribeGroups Response in its current protocol would not help.
OffsetFetch protocol can be used to extract offsets associated with given topic partitions in a consumer group. The problem is, when consumer group is in
Empty state or even when it is
Stable but not all its topics are being consumed, currently there is no way to extract all its topic partitions that it has consumed from so far. We can modify the behavior of
OffsetFetch protocol so it returns all topic partitions associated with the group if it is passed an empty list (as the list of topic partitions).
This is the current schema for
OffsetFetch (version 1, that applies to fetching from Kafka, and not ZooKeeper).
The first suggestion is to use the same
Response protocol but bump up the version to 2 since there is going to be a change in how the protocol is implemented. What will change in the protocol implementation is if an empty list is passed instead of a list of topic partitions the API will return the offsets of all topic partitions associated with the group.
The second suggestion has to do with how the above API is accessed and called. Currently, the way the offset information for each topic partition in a
Stable group is returned is through creating a "dummy" consumer in the group and use its
committed interface to extract those offset information:
committed call makes use of the
OffsetFetch API to extract the offset of the given partition. The suggestion here is to add a method to
AdminClient that extracts offset information of a consumer group by making a call to
OffsetFetch API, and passing an empty list of topic partitions. That will return all offsets of topic partitions associated with the consumer's group:
One benefit of using this method instead of using the
committed method is that we no longer need to create the dummy consumer to retrieve offsets. The other benefit here is that with one API call all offsets within the group are returned. Whereas, in the existing describe group implementation, for each topic partition in the group one API call is made.
The proposal is to
- Add to the implementation of the
OffsetFetchAPI the scenario where an empty list is passed as the list of topic partitions, and in response, offsets of all topic partitions associated with the group are returned.
- Add a
AdminClientthat makes use of the updated
OffsetFetchAPI above and returns offsets of all topic partitions associated with the consumer's group.
Compatibility, Deprecation, and Migration Plan
With respect to the first proposed change above the compatibility could become a concern if current users somehow rely on a different behavior when an empty list is passed to the
OffsetFetch API as the list of topic partitions. If users would want to stick to the current implementation of the API they can use version 1 of the API. The change suggested in this KIP would apply to version 2 of the API only.
With respect to the added interface to
AdminClient there would be no issue as that interface does not exist in current implementation.
- Changing the
DescribeGroupsprotocol so it also returns the offset information for all topic partitions from which the group has consumed from since its creation. More detailed can be found here.
- Exposing the added
OffsetFetchbehavior through a new interface in
KafkaConsumer, which would still imply that the dummy consumer has to be created in the group in order to retrieve offsets. More details can be found here.