Current state: Under Discussion
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Idempotent/transactional semantics depend on the broker retaining state for each active producer id (e.g. epoch and sequence number). When the broker loses that state–due to segment deletion or a call to DeleteRecords–then additional produce requests will result in the UNKNOWN_PRODUCER_ID error.
Currently the producer attempts to handle this error by comparing the last acknowledged offset with the log start offset from the produce response with the error. If the last acknowledged offset is smaller than the log start offset, then the producer assumes that the error is spurious. It resets the sequence number to 0 and retries using the existing epoch.
There are several problems with this approach:
Resetting the sequence number is fundamentally unsafe because it violates the uniqueness of produced records. Additionally, the lack of validation on the first write of a producer introduces the possibility of non-monotonic updates and hence, dangling transactions. In this KIP, we propose to address these problems and simplify the client handling.
Our proposal has two parts: 1) safe epoch incrementing, and 2) unknown producer fencing.
Safe Epoch Incrementing: When the producer receives an UNKNOWN_PRODUCER_ID error, in addition to resetting the sequence number, we propose to bump the epoch. For the idempotent producer, bumping the epoch can be done locally since its producer id is unique. The gap at the moment is a safe way for the transactional producer to do so. The basic problem is that the producer may have already been fenced by another instance, so we do not want to allow it to continue.
We propose to alter the InitProducerId API to accept an optional current epoch. When provided, the transaction coordinator will verify that it matches the current epoch and only allow the version bump if it does.
To simplify the handling, the producer will take the following steps upon receiving the UNKNOWN_PRODUCER_ID error:
Of course the producer may fail to receive the response from the InitProducerId call, so we need to make this API safe for retries. In the worst case, a retry may span coordinator failover, so we need to record in the transaction log whether the bump was the result of a new producer instance or not. We propose to add a new field to the transaction state message for the "instance_epoch," which is the first epoch assigned to a new producer instance. When the coordinator receives a new InitProducerId request, we will use the following logic to update the epoch:
Unknown Producer Fencing: We propose to introduce a new DescribeTransactionState API which allows a broker to verify with the transaction coordinator whether a producer id has been fenced. This is used only when the broker sees a write with a sequence number 0 from an unknown producer.
In practice, we expect the need for this API to be rare. As proposed in KAFKA-7190, we will alter the behavior of the broker to retain the cached producer state even after it has been removed from the log. Instead it will be removed only when the transactional id expiration time has passed. Under some circumstances we may have to rebuild the producer state using the log. One example is partition reassignment. A new replica will only see the producers which have state in the log. If one of these replicas becomes a leader, we may see the UNKNOWN_PRODUCER_ID error, which will result in an epoch bump. But the monotonicity of producer writes will never be violated.
Note that it is possible for a transaction to be completed while the DescribeTransactionState response is still inflight. The broker must verify after receiving the response that the producer state is still unknown.
We will bump the InitProducerId API. The new schemas are provided below:
InitProducerIdRequest => TransactionalId TransactionTimeoutMs ProducerId Epoch TransactionalId => NULLABLE_STRING TransactionTimeoutMs => INT32 ProducerId => INT32 // NEW Epoch => INT16 // NEW InitProducerIdResponse => Error ProducerId Epoch Error => INT16 ProducerId => INT64 Epoch => INT16
The producerId in the request is used to disambiguate requests following expiration of the transactionalId. After a transactional id has expired, its state is removed from the log. If the id is used again in the future, a new producerId will be generated.
As mentioned above, we will bump the version of the transaction state message to include the instance epoch.
Value => Version ProducerId ProducerEpoch TxnTimeoutDuration TxnStatus [TxnPartitions] TxnEntryLastUpdateTime TxnStartTime Version => 1 (INT16) InstanceEpoch => INT16 // NEW ProducerId => INT16 ProducerEpoch => INT16 TxnTimeoutDuration => INT32 TxnStatus => INT8 TxnPartitions => [Topic [Partition]] Topic => STRING Partition => INT32 TxnLastUpdateTime => INT64 TxnStartTime => INT64
As described above, the instance epoch is assigned on the first call to InitProducerId of a producer instance The first call uses an epoch of -1 and a producer id of -1 and will fence any active instance. The instance epoch does not change until a new producer instance with the same transactional id is initialized.
Additionally, this proposal introduces a new API to query transaction state. This will be used to check whether a
DescribeTransactionState => [TransactionalId] TransactionalId => STRING DescribeTransactionState => [Error ProducerId Epoch State Partitions] Error => INT16 ProducerId => INT64 Epoch => INT16 State => STRING Partitions => [TopicName [PartitionId]] TopicName => STRING PartitionId => INT32
The response includes the latest producer id and the latest epoch. This API is analogous to the DescribeGroup API. The following errors are possible:
Only in the latter two cases will the latest producer id and epoch be provided in the response.
The main problem from a compatibility perspective is dealing with the existing producers which reset the sequence number to 0 but continue to use the same epoch. We believe that caching the producer state even after it is no longer retained in the log will make the UNKNOWN_PRODUCER_ID error unlikely in practice. Furthermore, even if the sequence number is reset, the fencing check should still be valid. So we expect the behavior to continue to work as expected even with the additional protection.
The new CheckProducerEpoch API and the new version of the transaction state message will not be used until the inter-broker version supports it. We expect the usual two rolling bounce process for updating the cluster.
We considered fixing this problem in streams by being less aggressive with record deletion for repartition topics. This might make the problem less likely, but it does not fix it and we would like to have a general solution for all EOS users.