Status
Current state: Discussion
Discussion thread: here
JIRA: - KAFKA-4682Getting issue details... STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The offset of a topic partition within a consumer group typically expires when offsets.retention.minutes
elapses since the last offset commit to that partition. KAFKA-4682 reports an issue related to this offset expiration, where committed offsets are removed even when there are still active, but rarely committing, consumers in the (Stable
) group.
In other words, if offsets.retention.minutes
or longer has passed since an active consumer has committed offset for a topic partition, that committed offset will be removed from the consumer group metadata. If then there is a rebalance or the consumer restarts the last committed offset for that topic partition will not be found, and the consumer is forced to start from the start or end of the log (depending on the value of auto.offset.reset
configuration) leading to potential duplicate consumption or missing records. This situation can be avoided if the offsets are preserved beyond offsets.retention.minutes
after last offset commit if the group is still in a Stable
state.
There are workarounds to this issue and some of them are described in KAFKA-4682, but they come with their own limitations and drawbacks, as discussed in the JIRA.
Public Interfaces
This is the current OffsetCommit
protocol:
OffsetCommit Request (Version: 3) => group_id group_generation_id member_id retention_time [topics] group_id => STRING group_generation_id => INT32 member_id => STRING retention_time => INT64 topics => topic [partitions] topic => STRING partitions => partition offset metadata partition => INT32 offset => INT64 metadata => NULLABLE_STRING OffsetCommit Response (Version: 3) => throttle_time_ms [responses] throttle_time_ms => INT32 responses => topic [partition_responses] topic => STRING partition_responses => partition error_code partition => INT32 error_code => INT16
The only change made to this protocol is dropping the field retention_time
from the request. Retention time will be enforced through the broker config offsets.retention.minutes
in the new version of the protocol.
OffsetCommit Request (Version: 4) => group_id group_generation_id member_id [topics] group_id => STRING group_generation_id => INT32 member_id => STRING topics => topic [partitions] topic => STRING partitions => partition offset metadata partition => INT32 offset => INT64 metadata => NULLABLE_STRING OffsetCommit Response (Version: 4) => throttle_time_ms [responses] throttle_time_ms => INT32 responses => topic [partition_responses] topic => STRING partition_responses => partition error_code partition => INT32 error_code => INT16
Proposed Changes
A more viable solution for KAFKA-4682 can be achieved by changing how group offset expiration works: preserve committed offsets as long as the group is active (has consumers). The expiration timer should start ticking the moment all group members are gone and the group transitions into an Empty
state. This expiration semantics implies that there is no longer a need to enforce individual offset retention times and keep individual expiration timestamps for each topic partition in the group. This is because all committed offsets in the group will expire at the same time. As a result, the expireTimestamp
field will be removed from the offset metadata message.
The group’s offsets expiration time will be when the group becomes Empty
plus retention time of offsets.retention.minutes
(assuming during that time the group does not become active again). When the group is in Empty
state and the timer reaches the expiration time (set in that state), the group transitions to Dead
state, and all group offsets expire and will be removed.
Note that consumers may rejoin the group while the group is in Empty
state. As soon as that happens and the group state changes, the expiration timer will be disabled and the previously set expiration time of group offsets will be invalidated (until the group re-enters the Empty
state when the expiration time will be reset). This is a breakdown of group states and how the offsets expiration works in those states:
Stable
: Group offsets will not expire in this state. Expiration timer is disabled.PreparingRebalance
: Group offsets will not expire in this state. Expiration timer is disabled.CompletingRebalance
: Group offsets will not expire in this state. Expiration timer is disabled.Empty
: Expiration timer starts ticking as soon as group enters this state.Dead
: Group offsets have already expired in this state. When group is inEmpty
state and expiration timer reaches the configured retention time offsets expire and group transitions toDead
state.
The default retention time for group offsets can be customized through the existing offsets.retention.minutes
broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIP.
Compatibility, Deprecation, and Migration Plan
As mentioned above, as a result of this new semantics, all offsets in a group expire at the same time. The broker config offsets.retention.minutes
determines when they expire (after the group becomes Empty
). Older clients that prefer to honor their individual retention overrides (through OffsetCommitRequest
) will need to set offsets.retention.minutes
to the maximum of those overrides and the value of offsets.retention.minutes
. This simply guarantees that the offsets will not expire any earlier now with the new semantics (because the expiration timer for a partition now starts no earlier - and perhaps later - than before). This also simplifies handling of clients that use different versions of the API.
Rejected Alternatives
- Preserving the partition-level offset expiration and expiring offsets at different times once the group is empty: The alternative presented above seems to be cleaner and more intuitive.
- Starting the expiration timer for individual offsets once they are no longer being consumed: This likely requires the rejected alternative #1 to be in place too. Having all offset expire at once is simpler. If the growth of metadata cache is a concern the proposal can change or we can think about other ways to reduce the size (example).