...
Code Block | ||||
---|---|---|---|---|
| ||||
CONCURRENT_PRODUCER_COMMIT(85, "This producer attempted to commit offset to a topic partition which is owned by another producer in this generation.", ConcurrentProducerCommitException::new), |
Also we need to make sure we fence `old producer` instead of new ones, so a new generation id field shall be added to the `TxnOffsetCommitRequest` request:
Code Block |
---|
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Offsets GenerationId
TransactionalId => String
GroupId => String
ProducerId => int64
ProducerEpoch => int16
Offsets => Map<TopicPartition, CommittedOffset>
GenerationId => int32 // NEW |
And to avoid concurrent processing due to upgrade, we also want to introduce an exception to let consumer back off:
...