Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 


Status

Current stateAccepted

...

The following table summarizes this:

Version of OffsetCommitCommit TimestampOffset RetentionExpiration Timestamp
0[ZooKeeper based offset management - out of scope]
1 - no explicit commit timestampCurrent timestampBroker's offsets.retention.minutesCommit Timestamp + Offset Retention
1 - with explicit commit timestampPartition-specific timestamp in the requestBroker’s offsets.retention.minutes
2, 3Current timestampRequest’s retention_time

For versions 1-3, once the expiration timestamp is reached, the offset is removed from the offset cache (during the next cleanup) regardless of the group state. KAFKA-4682 reports an issue related to this offset expiration, where committed offsets are removed even when there are still active, but rarely committing, consumers in the (Stable) group.

...

This proposed change has an impact on the existing offset commit value schema. There is an expire_timestamp field in this schema that, as a result of expiring all group offsets at the same time, would become redundant (as it would repeat the same value for each offset in the group).  


Code Block
Offset Commit Value Schema (Version: 1) =>
  offset => Long
  metadata => String
  commit_timestamp => Long
  expire_timestamp => Long

...

The default retention time for group offsets can be customized through the existing offsets.retention.minutes broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIP. 


There are also a couple particular cases that need to be addressed with this new semantics:

...

Unfortunately, there is no notification mechanism in place for member subscription change within a group. Therefore, a poll mechanism can be implemented to run at specific intervals and check whether group subscription has deviated from what is stored in the cache. One place to do this is the repeating offset cleanup scheduled jobs, which by default run every 10 minutes, making them a good choice as the group subscription check will not be executed very frequently. At every execution of this job we collect a list of all topic partitions the group is consuming from (this can be calculated based on the data in each group member’s metadata), and cross reference it with the stored offsets for the group. If there are partitions the group has offset for but no longer consumes from, and offsets.retention.minutes has passed since their last commit timestamp, the corresponding offsets will be removed from the offset cache.

Note: This feature was not implemented as part of the KIP implementation and was intentionally left out for future implementation.

Standalone (Simple) Consumer

...

The following table summarize how the new offset expiration semantics would be implemented.

Group StateAdditional Check in Offset Cleanup JobAction if Check Holds

= Empty

(protocolType != None)

current timestamp - current_state_timestamp ≥ broker's offsets.retention.minutes

  1. Remove all group offsets
  2. Transition the group to Dead
Empty

(Non-subscribed partitions = partitions group has offset for - partitions group is consuming from)

∀partition ∈ non-subscribed partitions:

  • current timestamp - partition's commit_timestamp broker's offsets.retention.minutes
Remove offset of partition
= Empty
(protocolType = None) 

current timestamp - partition's commit_timestamp broker's offsets.retention.minutes
Remove offset of partition

Note that there are different valid protocolType values, such as consumer and stream, and the above semantics applies to them all.

...