Another case we want to handle is InvalidProducerIdMapping. This error occurs following expiration of the producerId. It's possible that another producerId has been installed in its place following expiration (if another producer instance has become active), or the mapping is empty. We can safely retry the InitProducerId with the logic in this KIP in order to detect which case it is:
- After receiving INVALID_PRODUCER_ID_MAPPING, the producer can send InitProducerId using the current producerId and epoch.
- If no mapping exists, the coordinator can generate a new producerId and return it. If a transaction is in progress on the client, it will have to be aborted, but the producer can continue afterwards.
- Otherwise if a different producerId has been assigned, then we can return INVALID_PRODUCER_EPOCH since that is effectively what has happened. This is intended to simplify error handling. The point is that there is a newer producer and it doesn't matter whether it has the same producer id or not.
Prolonged producer state retention: As proposed in KAFKA-7190, we will alter the behavior of the broker to retain the cached producer state even after it has been removed from the log. Previously we attempted to keep the producer state consistent with the the contents of the log so that we could rebuild it from the log if needed. However, it is rarely necessary to rebuild producer state, and it is more useful to retain the state we have as long as possible. Here we propose to remove it only when the transactional id expiration time has passed.
Simplified error handling: Much of the complexity in the error handling of the idempotent/transactional producer is a result of the UNKNOWN_PRODUCER_ID case. Since we are proposing to cache producer state for as long as the transactional id expiration time even after removal from the log, this should become a rare error, so we propose to simplify our handling of it. The current handling attempts to reason about the log start offset and whether or not the batch had been previously retried. If we are sure it is safe, then we attempt to adjust the sequence number of the failed request (and any inflight requests which followed). Not only is this behavior complex to implement, but continuing with subsequent batches introduces the potential for reorderingduplicates. Currently there is no easy way to prevent this from happening.
- Assignment of the epoch/sequence number to a record batch is permanent and happens at the time of the record send. We will remove the logic to adjust sequence numbers after failing a batch.
- When we encounter a fatal error for a batch, we will fail all subsequent batches for that partition which have been assigned a sequence number.
This will be simpler to implement and easier for users to reason about. Records will be guaranteed to be delivered in order . If any record fails delivery, then all subsequently sent records fail as well. up until the first fatal error and there will be no duplicates. For the transactional producer, the user can proceed by aborting the current transaction and ordering can still be guaranteed going forward. Internally, the producer will bump the epoch and reset sequence numbers for the next transaction. For the idempotent producer, the user can choose to fail or they can continue (with the possibility of duplication or reordering). If the user continues, the epoch will be bumped locally and the sequence number will be reset.
We will bump the InitProducerId API. The new schemas are provided below: