This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-360: Improve handling of unknown producer

Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Get rid of instance id and use instance epoch instead


Of course the producer may fail to receive the response from the InitProducerId call, so we need additionally to make this API safe for retries. We propose to introduce an instance id which is uniquely assigned by the transaction coordinator on the first call to InitProducerId. This instance id is fixed for the lifetime of the producer instance and provided in additional calls to InitProducerId. If the transaction coordinator receives an InitProducerId which has a matching instance id, but an older epoch, it will simply return the latest epoch. In the worst case, a retry may span coordinator failover, so we need to record in the transaction log whether the bump was the result of a new producer instance or not. We propose to add a new field to the transaction state message for the "instance_epoch," which is the first epoch assigned to a new producer instance. When the coordinator receives a new InitProducerId request, we will use the following logic to update the epoch:

  1. No epoch is provided: the current epoch will be bumped and the instance epoch will be set to the new value.
  2. Epoch is provided:
    1. Matches current epoch: the current epoch is bumped, but the instance epoch will stay the same.
    2. Epoch is greater than or equal to instance epoch: the current epoch will be returned
    3. Epoch is less than instance epoch: return INVALID_PRODUCER_EPOCH
    4. Epoch is greater than current epoch: return INVALID_PRODUCER_EPOCH

Unknown Producer Fencing: We propose to introduce a new inter-broker DescribeTransactionState API which allows a broker to verify with the transaction coordinator whether a producer id has been fenced. This is used only when the broker sees a write with a sequence number 0 from an unknown producer. 

In practice, we expect the need for this API to be rare. As suggested proposed in KAFKA-7190, we will alter the behavior of the broker to retain the cached producer state even after it has been removed from the log. Instead it will be removed only when the transactional id expiration time has passed. Under some circumstances we may have to rebuild the producer state using the log. One example is partition reassignment. A new replica will only see the producers which have state in the log. If one of these replicas becomes a leader, we may see the UNKNOWN_PRODUCER_ID error, which will result in an epoch bump. But the monotonicity of producer writes will never be violated.

Note that it is possible for a transaction to be completed while the DescribeTransactionState response is still inflight. The broker must verify after receiving the response that the producer state is still unknown.

Public Interfaces

We will bump the InitProducerId API. The new schemas are provided below:

Code Block
InitProducerIdRequest => TransactionalId TransactionTimeoutMs InstanceIdProducerId Epoch
 TransactionalId => NULLABLE_STRING
 TransactionTimeoutMs => INT32
 InstanceIdProducerId => NULLABLE_STRING INT32           // NEW
 Epoch => INT16                // NEW

InitProducerIdResponse => Error InstanceId ProducerId Epoch
 Error => INT16
 InstanceId => STRING  // NEW
 ProducerId => INT64
 Epoch => INT16

As described above, the instance id is uniquely assigned on the first call to InitProducerId, which uses an initial value of NULL and an epoch of -1. The instance id does not change until a new producer instance with the same transactional id is initialized.

The instance id must be added to the persistent state in the transaction log so that it is not lost upon coordinator failover. We The producerId in the request is used to disambiguate requests following expiration of the transactionalId. After a transactional id has expired, its state is removed from the log. If the id is used again in the future, a new producerId will be generated. 

As mentioned above, we will bump the version of the transaction state message to 1. The new schema is provided below:include the instance epoch.

Code Block
Value => Version ProducerId ProducerEpoch TxnTimeoutDuration TxnStatus [TxnPartitions] TxnEntryLastUpdateTime TxnStartTime
  Version => 1 (INT16)
  InstanceIdInstanceEpoch => STRINGINT16  // NEW
  ProducerId => INT16
  ProducerEpoch => INT16
  TxnTimeoutDuration => INT32
  TxnStatus => INT8
  TxnPartitions => [Topic [Partition]]
     Topic => STRING
     Partition => INT32
  TxnLastUpdateTime => INT64
  TxnStartTime => INT64

As described above, the instance epoch is assigned on the first call to InitProducerId of a producer instance The first call uses an epoch of -1 and a producer id of -1 and will fence any active instance. The instance epoch does not change until a new producer instance with the same transactional id is initialized.

Additionally, this proposal introduces a new inter-broker API which is used to verify the epoch associated with a transactional id.API to query transaction state. This will be used to check whether a 

Code Block
CheckProducerEpochRequestDescribeTransactionState => [TransactionalId]
  TransactionalId => STRING

DescribeTransactionState => [Error ProducerId Epoch State Partitions]
TransactionalId  Error => STRINGINT16
  ProducerId => INT64
  Epoch => INT16
  CheckProducerEpochResponseState => ErrorSTRING
ProducerId Epoch Partitions  Error => INT16
[TopicName [PartitionId]]
 ProducerId   TopicName => INT64STRING
  Epoch  PartitionId => INT16INT32

The response includes the latest producer id and the latest epoch. If the provided epoch and producer id are correct, the coordinator will return NONE as the error code. Otherwise, the This API is analogous to the DescribeGroup API. The following errors are possible:


Only in the latter two cases will the latest producer id and epoch be provided in the response.