The rest of this section explains how these suggested changes help in implementing the new group expiration semantics.
The expiration time of offsets in a group will be when the group becomes
Empty plus retention time of
offsets.retention.minutes (assuming during that time the group does not become active again). Whenever the group transitions to
empty_state_timestamp resets to the value of current timestamp. Then, during any scheduled offset cleanup task, if "current timestamp" minus
empty_state_timestamp is greater than or equal to broker's
offsets.retention.minutes for any group, all offsets in that group will be removed and the group will transition to
The default retention time for group offsets can be customized through the existing
offsets.retention.minutes broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIP.
There is are also a couple particular corner case cases that needs need to be addressed with this new semantics:
- If a group consumer unsubscribes from a topic but continues to consume from other subscribed topics, the offset information of that unsubscribed topic’s partitions should be deleted at the appropriate time.
- Standalone (simple) consumer does not use Kafka's group management mechanism, and requires special handling when it comes to offset expiration.
Unsubscribing from a Topic
Unfortunately, there is no notification mechanism in place for member subscription change within a group. Therefore, a poll mechanism can be implemented to run at specific intervals and check whether group subscription has deviated from what is stored in the cache. One place to do this is the repeating offset cleanup scheduled jobs, which by default run every 10 minutes, making them a good choice as the group subscription check will not be executed very frequently. At every execution of this job we collect a list of all topic partitions the group is consuming from (this can be calculated based on the the data in each group member’s metadata), and cross reference it with the stored offsets for the group. If there are partitions the group has offset for but no longer consumes from, and
offsets.retention.minutes has passed since their last commit timestamp, the corresponding offsets will be removed from the offset cache.
Standalone (Simple) Consumer
The standalone consumer uses Kafka for offset storage only. For this consumer the group state is always
Empty, and the corresponding
None. Since the above mentioned expiration mechanism will not work for these consumers, the offset of a partition will be expired for them when
offsets.retention.minutes passes since their last commit timestamp.
The following table summarize how the new offset expiration semantics would be implemented.
|Group State||Additional Check in Offset Cleanup Job||Action if Check Holds|
current timestamp -
(Non-subscribed partitions = partitions group has offset for - partitions the group is consuming from)
∀partition ∈ non-subscribed partitions:
|Remove offset of partition|
current timestamp - partition's
|Remove offset of partition|
Another Related Change
When group names are automatically generated by the console consumer they are very likely not to be reused. Therefore, it makes sense to skip storing offsets for them by default to avoid one of the top factors for offset cache size growth. The proposal is to disable auto offset commit by default in this situation. Implementing this change would become more critical once
Compatibility, Deprecation, and Migration Plan
- The new protocol does not allow clients to customize the retention time of specific offsets in the group. The old consumers, however, could still commit offsets with a customized retention time. Such old consumers will continue to be supported:
- If a consumer uses the old API without customizing the retention time, the new approach will be applied; i.e., the broker’s
offsets.retention.minutesconfig will be used as the retention time of its offsets once it becomes Empty. The same retention will be used for offsets of partitions the group no longer consumes from (or is subscribed to).
- If a consumer uses the old API with a customized retention time, the provided retention time will become the retention time of the offsets in question from the offset commit timestamp (this fully matches the current behavior). In this scenario, version 1 of offset commit value schema (with the
expire_timestampfield) will be used.
- This should be rare, but clients who rely on the auto offset commit functionality of the consumer when the group name is auto-generated by console consumer, will need to manually set the auto offset commit to
- Making all group offsets expire at the same time. : Even though this is a good solution for when the group becomes
Empty, it fails to address the scenario where the group stops consuming from a particular partition and causes those offsets to remain while the group exists, which leads to unnecessary expansion of the group metadata cache.