DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: here
JIRA:
KAFKA-20066
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In the Kafka protocol, when a consumer commits offsets or a producer tries to add offsets to a transaction, it includes its epoch/generation of the consumer group. The point of this is for the group coordinator to fence against zombie commit requests, that is, commit requests that include an offset for a partition that was since reassigned to a different member. If such a guard was not in place, a zombie offset commit may overwrite offsets of the new owner, or its offsets may be committed to the consumer offset topic but not be included in the result of the new owner's offset fetch request.
In consumer groups based on KIP-848, when receiving an offset commit request that includes the client-side member epoch and a member ID, the group coordinator performs the check
Client-Side Member Epoch == Broker-Side Member Epoch
If the check fails, it returns a STALE_MEMBER_EPOCH error for regular offset commits and a ILLEGAL_GENERATION for transactional offset commits. If the member epoch sent in the request is the current broker-side member epoch, KIP-848 guarantees that the partition cannot also be owned by a different member at the same or a larger epoch. Therefore, this is sufficient for fencing zombie commits. Note that we assume zombie commits will always contain offsets for partitions that were owned by the member at the member epoch sent in the request. Commit requests that commit offsets for partitions that are not owned by the member in that epoch are not possible in a correct client-side implementation of the protocol.
It's important to note that commits can also be fenced because a member falls out of the group (e.g., because it does not revoke partitions within the rebalance timeout). In this case, the member's commits will be fenced solely based on the member ID (which is not part of the group anymore). We will ignore this case in this KIP and only consider zombie commits from members that are still part of the group.
Downsides of the current approach
This fencing is, however, unnecessarily strict. Assume, for example, a member owns P1 at epoch 1. The broker-side member epoch is bumped to 2, but the member still has P1 assigned at epoch 2. The member may not learn about the new broker-side member epoch in time and submit an offset commit for P1 with epoch 1. This is not a zombie commit request as defined above (because P1 was not reassigned to a different member), but it will still be rejected by a KIP-848 group coordinator.
The trouble with this fencing mechanism is that it is very difficult to avoid the broker-side member epoch being bumped concurrently with an offset commit. Seen from the client-side, the broker-side member epoch may be bumped at any time while a heartbeat to the group coordinator is in flight. To make sure the member epoch sent in an offset commit request is up-to-date would require making sure that no consumer group heartbeat request is in-flight at the same time.
Why a broker-side fix is warranted
This problem is particularly challenging to solve on the client side for transactional offset commits using the current Java consumer and producer implementations. The reason is that the heartbeat is sent by the consumer, but the transactional commit is initiated by the producer. The producer has no way of knowing when a consumer group heartbeat is in flight. The member epoch is passed from the Java consumer to the Java producer using the ConsumerGroupMetadata object, which is passed into sendOffsetsToTransaction. By the time the transactional offset commit is sent, the member epoch may be stale; the broker will return an ILLEGAL_GENERATION exception. This will force the Java producer into an abortable error state, surfacing the error as a CommitFailedException to the user; the user has no other way to recover from this other than aborting the transaction.
While aborting a transaction is in principle a valid strategy for an application to recover from the ILLEGAL_GENERATION error, aborting transactions means throwing away work and restarting from an earlier point, which can hurt performance.
Conceptual Design
In this KIP, we therefore propose to relax the condition for offset commit fencing. To derive a more relaxed check, we need to identify an epoch that separates zombie commits from commits of the current owner. As mentioned above, zombie commit requests are commit requests that include a partition, member ID, and member epoch combination so that the member owned the partition at that epoch. However, the partition has since been reassigned to a different member.
On the level of a single partition, a relaxed offset commit check can be defined using an assignment epoch for each assigned partition and each member, which is the epoch at which the partition was assigned to that member. To fence against zombie commit requests, we can reject all offset commit requests from a member who either does not have the partition assigned or that includes any member epoch that is smaller than the assignment epoch for that member and that partition.
Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
Using this check, all commits of the current partition owner will be accepted, since the Client-Side Member Epoch of the current owner must always have an epoch that is larger than or equal to the assignment epoch (a partition that is revoked in one epoch is never reassigned in the same epoch). All zombie commits from that member will be rejected, because if a partition was owned by the member A at Client-Side Member Epoch (which we assume for zombie commits), but it was reassigned to member B since, we have two possible cases:
Member A currently does not have the partition assigned
Member A does currently have the partition assigned, but then it must have been reassigned to member A after being assigned to member B. By KIP-848, this cannot all happen in the same epoch, so we must have
Assignment Epoch > Client-Side Member Epoch.
In both cases, the zombie commit will therefore be rejected by the relaxed check.
Note that, as before, we will allow commits for partitions that are not owned by the member, if and only if the offset commit request contains the most recent broker-side member epoch. This is done to make the new rule strictly weaker than the previous check, which also allowed such commits.
Proposed Changes
Introducing an epoch per assigned partition
We extend the model of a consumer group with one integer per assigned partition for each member of a group. This includes both partitions directly assigned to the member and partitions pending revocation. The assignment epoch is set to the epoch in which the partition was assigned to the member, and we have the invariant AssignmentEpoch <= MemberEpoch <= TargetAssignmentEpoch <= GroupEpoch.
The AssignmentEpoch is added as a field to TopicPartitions in ConsumerGroupCurrentMemberAssignmentValue, so that it can be stored and replayed from the committed offsets topic.
Relaxing the offset commit validation
We replace the current check offset commit validation check
Client-Side Member Epoch == Broker-Side Member Epoch
by
Assignment Epoch <= Client-Side Member Epoch <= Broker-Side Member Epoch
Where, for simplicity, we can assume the assignment epoch of a partition that is not assigned to that member to be the broker-side member epoch, to match the current behavior of accepting commits for unassigned partitions. Therefore, we allow commits for partitions that are not assigned to the member, as long as the committer uses an up-to-date member epoch, same as the current implementation.
Handling of commits without generation ID / member epoch
Requests that do not contain a valid member epoch, which can come from the admin client or a consumer with self-assigned partitions will be handled the same way as before. That is, for a non-existing groups, a simple group is created (which is not affected by this KIP). Offset commits with member epoch or generation ID -1 are unconditionally accepted of the group is empty, or the commit is transactional.
Handling of legacy record members
When ConsumerGroupCurrentMemberAssignmentValue does not contain assignment epochs, they will be initialized to the broker-side member epoch when the record is replayed.
Handling of static members
When a static member leaves the group, the assignment epochs will be set to 0 in the current assignment record. When the static member rejoins, partitions will therefore be considered to be assigned "from the beginning" (i.e., from epoch 0) to the new member ID. All commits using the old memberId will be fenced, and all commits using the new memberId will be permitted (since all client-side member epochs are larger than 0).
Handling during protocol upgrade
If we upgrade from the classic protocol, we will set the assignment epoch of all partitions to the generation ID of the group.
If we downgrade to the classic protocol, the assignment epochs will be abandoned, since they are not required in the classic protocol.
Public Interfaces
ConsumerGroupCurrentMemberAssignmentValue
We add a new field, AssignmentEpochs to TopicPartitions. To keep the representation compact, we store it as an array aligned with Partitions, that is, for every item in partition there is one item in AssignmentEpochs. The field is nullable and tagged. For legacy records that do not include the assignment epochs, it will be null, and assignment epochs for all partitions in assignedPartitions and partitionsPendingRevocation are considered to be equal to memberEpoch.
{
"apiKey": 8,
"type": "coordinator-value",
"name": "ConsumerGroupCurrentMemberAssignmentValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
...
],
"commonStructs": [
{ "name": "TopicPartitions", "versions": "0+", "fields": [
..
{ "name": "AssignmentEpochs", "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0, "type": "[]int32", "default": null,
"about": "The epoch at which any partition was assigned to the member. Used to fence zombie commits requests. Of the same length as partitions. If null, all assignment epochs are considered to be equal to the member epoch." }
]}
]
}
Compatibility, Deprecation, and Migration Plan
The records above will use a tagged field, so no record version bump needs to be done.
If a member assignment record contains no
AssignmentEpochsit will default tonull. In this case, the assignment epoch of all partitions will be initialized to the current member epoch of the member. This means the original offset commit check will be used, preserving safety. The next update of theConsumerGroupMemberCurrentAssignmentValuerecord will use.When rolling back, the new field will be ignored, and the original offset commit check will be used.
Test Plan
We will extend the existing integration and unit tests to cover the new behavior.
Rejected Alternatives
One could attempt to reach from the producer into the consumer background thread to make consumer heartbeats and transactional offset commits mutually exclusive. This would solve the problem, since member epochs are only bumped when a heartbeat is in flight. This would be more of a workaround, and would likely require a larger change to the Java client APIs.
- We considered tracking a single
RevocationEpochper member instead of an assignment epoch per assigned partition, which is the last epoch a partition was revoked from the member. While this would have reduced the chance of hitting the race conditions, it turned out there were still cases where you could run into spuriousILLEGAL_GENERATIONerrors. If we use a consumer rebalance listener to always commit any open transactions in
onPartitionsRevokedand abort any open transactions inonPartitionsLost, we can actually make sure that theILLEGAL_GENERATIONcan always be safely retried. The reason is that the member epoch cannot be bumped during the execution of the rebalance handler, and partitions cannot be revoked without executing the rebalance listener. So one option discussed was to simply retry the error, since we know that all partitions we are trying to commit are still assigned to us; just the member epoch was outdated. However, this would make the usage of transactions in Kafka even more complicated. Instead, we want to resolve the problem on the broker side and simplify client-side usage of transactions. Furthermore, this would require significant changes in the Java producer, since it can only enter an abortable state, a fatal state, or retry a request immediately. Implementing a retry after refreshing the consumer group metadata would have required a new way of interacting with the producer: we'd have to implement an “application-retriable” kind of error, that is, handing back control flow to the application without entering an error state, without retrying the commit immediately, and without indicating “success” to the application.
References
- A similar mechanism was already implemented in KIP-1071: Streams Rebalance Protocol
