Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]"Draft"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
The offset of a topic partition within a consumer group typically expires when offsets.retention.minutes
elapses since the last offset commit to that partition. KAFKA-4682 reports an issue related to this offset expiration, where committed offsets are removed even when there are still active, but rarely committing, consumers in the (Stable
) group.
In other words, if offsets.retention.minutes
or longer has passed since an active consumer has committed offset for a topic partition, that committed offset will be removed from the consumer group metadata. If then there is a rebalance or the consumer restarts the last committed offset for that topic partition will not be found, and the consumer is forced to start from the start or end of the log (depending on the value of auto.offset.reset
configuration) leading to potential duplicate consumption or missing records. This situation can be avoided if the offsets are preserved beyond offsets.retention.minutes
after last offset commit if the group is still in a Stable
state.
There are workarounds to this issue and some of them are described in KAFKA-4682, but they come with their own limitations and drawbacks.
Public Interfaces
This is the current OffsetCommit
protocol:
Code Block |
---|
OffsetCommit Request (Version: 3) => group_id group_generation_id member_id retention_time [topics]
group_id => STRING
group_generation_id => INT32
member_id => STRING
retention_time => INT64
topics => topic [partitions]
topic => STRING
partitions => partition offset metadata
partition => INT32
offset => INT64
metadata => NULLABLE_STRING
OffsetCommit Response (Version: 3) => throttle_time_ms [responses]
throttle_time_ms => INT32
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code
partition => INT32
error_code => INT16 |
The only change made to this protocol is dropping the field retention_time
from the request. Retention time will be enforced through the broker config offsets.retention.minutes
in the new version of the protocol.
Code Block |
---|
OffsetCommit Request (Version: 4) => group_id group_generation_id member_id [topics]
group_id => STRING
group_generation_id => INT32
member_id => STRING
topics => topic [partitions]
topic => STRING
partitions => partition offset metadata
partition => INT32
offset => INT64
metadata => NULLABLE_STRING
OffsetCommit Response (Version: 4) => throttle_time_ms [responses]
throttle_time_ms => INT32
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code
partition => INT32
error_code => INT16 |
Proposed Changes
A more viable solution for KAFKA-4682 can be achieved by changing how group offset expiration works: preserve committed offsets as long as the group is active (has consumers). The expiration clock should start ticking the moment all group members are gone and the group transitions into an Empty
state. This expiration semantics implies that there is no longer a need to enforce individual offset retention times and keep individual expiration timestamps for each topic partition in the group. This is because all committed offsets in the group will expire at the same time. As a result, the expireTimestamp
field will be removed from the offset metadata message.
The group’s offsets expiration time will be when the group becomes Empty
plus retention time of offsets.retention.minutes
(assuming during that time the group does not become active again). When the group is in Empty
state and the clock reaches the expiration time (set in that state), the group transitions to Dead
state, and all group offsets expire and will be removed.
The default retention time for group offsets can be customized through the existing offsets.retention.minutes
broker configuration. If, in the future, a need arises for enforcing a per group retention configuration, it can be implemented via a separate KIPDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
As mentioned above, as a result of this new semantics, all offsets in a group expire at the same time. The broker config offsets.retention.minutes
determines when they expire (after the group becomes Empty
). Older clients that prefer to honor their individual retention overrides (through OffsetCommitRequest
) will need to set offsets.retention.minutes
to the maximum of those overrides and the value of offsets.retention.minutes
. This simply guarantees that the offsets will not expire any earlier now with the new semantics (because the expiration clock for a partition now starts no earlier - and perhaps later - than before). This also simplifies handling of clients that use different versions of the API.
Backward and forward compatibility of OffsetCommit
API:
- Old client - New broker: Old clients send an
OffsetCommitRequest
of version 3 or earlier. Any custom partition-level retention will be ignored, and instead the broker level retention ofoffsets.retention.minutes
will apply to determine expiry of all offsets within the group. As discussed above, a high enoughoffsets.retention.minutes
has to be used to guarantee the previous retentions are not negatively impacted. Group offsets expire together. - New client - Old broker: Clients do not send any topic level retention, therefore the default retention set by broker's
offsets.retention.minutes
will apply, which is in fact what the new clients expect. But group offsets individually expire as the expiration depends on the timing of latest commit for each topic partition. - What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.