Child pages
  • KIP-211: Revise Expiration Semantics of Consumer Group Offsets

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To make up for the per-offset expiration timestamp we lose in the new version of offset commit value schema, a new field is added in the group metadata value schema that indicates when the group last transitioned to Empty state, that defaults to Long.MaxValue.changed state.

Code Block
Group Metadata Value Schema (Version: 1) =>
  protocol_type => String
  generation => Int
  protocol => String
  leader => String
  members => [member]
    ...
Code Block
Group Metadata Value Schema (Version: 2) =>
  protocol_type => String
  generation => Int
  protocol => String
  leader => String
  emptycurrent_state_timestamp => Long
  members => [member]
    ...

...

The expiration time of offsets in a group will be when the group becomes Empty plus retention time of offsets.retention.minutes (assuming during that time the group does not become active again). Whenever the group transitions to Empty state, emptycurrent_state_timestamp resets to the value of current timestamp. Then, during any scheduled offset cleanup task, if "current timestamp" minus emptycurrent_state_timestamp is greater than or equal to broker's offsets.retention.minutes for any group, all offsets in that group will be removed and the group will transition to Dead state.

Note that consumers may rejoin the group while the group is in Empty state. As soon as that happens and , the group transitions out of Empty state changes, empty_state_timestamp resets to the default Long.MaxValue which practically disables the expiration timer, and that practically disables offset expiration. This is a breakdown of group states and how the offsets expiration works in those states:

  • Stable: Group offsets will not expire in this state. The field empty_state_timestamp is Long.MaxValue.
  • PreparingRebalance: Group offsets will not expire in this state. The field empty_state_timestamp is Long.MaxValue.
  • CompletingRebalance: Group offsets will not expire in this state. The field empty_state_timestamp is Long.MaxValue.
  • Empty: The field empty_state_timestamp is set to when group last transitioned to this state. If the group stays in this for offsets.retention.minutes, the following offset cleanup scheduled task will remove all offsets in the group (as explained above).
  • Dead: Group offsets have already expired in this stateexpired (group deletion); or the group is unloaded from the coordinator cache (coordinator change).

The default retention time for group offsets can be customized through the existing offsets.retention.minutes broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIP.

...

Unfortunately, there is no notification mechanism in place for member subscription change within a group. Therefore, a poll mechanism can be implemented to run at specific intervals and check whether group subscription has deviated from what is stored in the cache. One place to do this is the repeating offset cleanup scheduled jobs, which by default run every 10 minutes, making them a good choice as the group subscription check will not be executed very frequently. At every execution of this job we collect a list of all topic partitions the group is consuming from (this can be calculated based on the the data in each group member’s metadata), and cross reference it with the stored offsets for the group. If there are partitions the group has offset for but no longer consumes from, and offsets.retention.minutes has passed since their last commit timestamp, the corresponding offsets will be removed from the offset cache.

...

Group StateAdditional Check in Offset Cleanup JobAction if Check Holds

= Empty

(protocolType = Some("consumer"))

current timestamp - emptycurrent_state_timestamp ≥ broker's offsets.retention.minutes

  1. Remove all group offsets
  2. Transition the group to Dead
Empty

(Non-subscribed partitions = partitions group has offset for - partitions group is consuming from)

∀partition ∈ non-subscribed partitions:

  • current timestamp - partition's commit_timestamp broker's offsets.retention.minutes
Remove offset of partition
= Empty
(protocolType = None) 

current timestamp - partition's commit_timestamp broker's offsets.retention.minutes
Remove offset of partition

...