Child pages
  • KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The rest of this section explains how these suggested changes help in implementing the new group expiration semantics.

Transitioning to Empty State

The expiration time of offsets in a group will be when the group becomes Empty plus retention time of offsets.retention.minutes (assuming during that time the group does not become active again). Whenever the group transitions to Empty state, empty_state_timestamp resets to the value of current timestamp. Then, during any scheduled offset cleanup task, if "current timestamp" minus empty_state_timestamp is greater than or equal to broker's offsets.retention.minutes for any group, all offsets in that group will be removed and the group will transition to Dead state.

...

The default retention time for group offsets can be customized through the existing offsets.retention.minutes broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIP.

 

There is are also a couple particular corner case cases that needs need to be addressed with this new semantics:

  1. If a group consumer unsubscribes from a topic but continues to consume from other subscribed topics, the offset information of that unsubscribed topic’s partitions should be deleted at the appropriate time.

...

  1. Standalone (simple) consumer does not use Kafka's group management mechanism, and requires special handling when it comes to offset expiration.

Unsubscribing from a Topic

...

Unfortunately, there is no notification mechanism in place for member subscription change within a group. Therefore, a poll mechanism can be implemented to run at specific intervals and check whether group subscription has deviated from what is stored in the cache. One place to do this is the repeating offset cleanup scheduled jobs, which by default run every 10 minutes, making them a good choice as the group subscription check will not be executed very frequently. At every execution of this job we collect a list of all topic partitions the group is consuming from (this can be calculated based on the the data in each group member’s metadata), and cross reference it with the stored offsets for the group. If there are partitions the group has offset for but no longer consumes from, and offsets.retention.minutes has passed since their last commit timestamp, the corresponding offsets will be removed from the offset cache.

Standalone (Simple) Consumer

The standalone consumer uses Kafka for offset storage only. For this consumer the group state is always Empty, and the corresponding protocolType is None. Since the above mentioned expiration mechanism will not work for these consumers, the offset of a partition will be expired for them when offsets.retention.minutes passes since their last commit timestamp.

The following table summarize how the new offset expiration semantics would be implemented.

Group StateAdditional Check in Offset Cleanup JobAction if Check Holds

= Empty

(protocolType = Some("consumer"))

current timestamp - empty_state_timestamp ≥ broker's offsets.retention.minutes

  1. Remove all group offsets
  2. Transition the group to Dead
Empty

(Non-subscribed partitions = partitions group has offset for - partitions the group is consuming from)

∀partition ∈ non-subscribed partitions:

  • current timestamp - partition's commit_timestamp broker's offsets.retention.minutes
Remove offset of partition
= Empty
(protocolType = None) 

current timestamp - partition's commit_timestamp broker's offsets.retention.minutes
Remove offset of partition

Another Related Change

When group names are automatically generated by the console consumer they are very likely not to be reused. Therefore, it makes sense to skip storing offsets for them by default to avoid one of the top factors for offset cache size growth. The proposal is to disable auto offset commit by default in this situation. Implementing this change would become more critical once 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3806
 (KIP-186) lands: it changes the default retention from 1 day to 7 days. 

Compatibility, Deprecation, and Migration Plan

  • The new protocol does not allow clients to customize the retention time of specific offsets in the group. The old consumers, however, could still commit offsets with a customized retention time. Such old consumers will continue to be supported:
    • If a consumer uses the old API without customizing the retention time, the new approach will be applied; i.e., the broker’s
    config
    • offsets.retention.minutes config will be used as the retention time of its offsets once it becomes Empty. The same retention will be used for offsets of partitions the group no longer consumes from (or is subscribed to).
    • If a consumer uses the old API with a customized retention time, the provided retention time will become the retention time of the offsets in question from the offset commit timestamp (this fully matches the current behavior). In this scenario, version 1 of offset commit value schema (with the expire_timestamp field) will be used.

Rejected Alternatives

...

  • This should be rare, but clients who rely on the auto offset commit functionality of the consumer when the group name is auto-generated by console consumer, will need to manually set the auto offset commit to true.

Rejected Alternatives

  1. Making all group offsets expire at the same time. : Even though this is a good solution for when the group becomes Empty, it fails to address the scenario where the group stops consuming from a particular partition and causes those offsets to remain while the group exists, which leads to unnecessary expansion of the group metadata cache.