Current state: Approved
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
We have seen hanging transactions in Kafka where the last stable offset (LSO) does not update, we can’t clean the log (if the topic is compacted), and read_committed consumers get stuck.
This can happen when a message gets stuck or delayed due to networking issues or a network partition, the transaction aborts, and then the delayed message finally comes in. The delayed message case can also violate EOS if the delayed message comes in after the next addPartitionsToTxn request comes in. Effectively we may see a message from a previous (aborted) transaction become part of the next transaction.
Another way hanging transactions can occur is that a client is buggy and may somehow try to write to a partition before it adds the partition to the transaction. In both of these cases, we want the server to have some control to prevent these incorrect records from being written and either causing hanging transactions or violating Exactly once semantics (EOS) by including records in the wrong transaction.
The best way to avoid this issue is to:
Uniquely identify transactions by bumping the producer epoch after every commit/abort marker. That way, each transaction can be identified by (producer id, epoch).
Remove the addPartitionsToTxn call and implicitly just add partitions to the transaction on the first produce request during a transaction.
We avoid the late arrival case because the transaction is uniquely identified and fenced AND we avoid the buggy client case because we remove the need for the client to explicitly add partitions to begin the transaction.
Of course, 1 and 2 require client-side changes, so for older clients, those approaches won’t apply.
3. To cover older clients, we will ensure a transaction is ongoing before we write to a transaction. We can do this by querying the transaction coordinator and caching the result.
Of course, for older clients, we may include a record from the previous transaction if the new transaction has started and we receive a late message. This issue can only be avoided with the new clients through the epoch bump.
We will bump the ProduceRequest/Response and TxnOffsetCommitRequest/Response version to indicate the client is using the new protocol that doesn’t require adding partitions to transactions and will implicitly do so. The bump will also support new errors
ABORTABLE_ERROR can be returned on any failure server-side so that the server can indicate to the client that the transaction should be aborted. See more details in Return Error for Non-Zero Sequence on New Producers below.
Additionally, we will update the
END_TXN_RESPONSE to include a bumped epoch for clients to use. We will also bump the request/response version.
We will bump the versioning for AddPartitionsToTxn to add support for "verifyOnly" mode used for old clients to verify the partitions are in a given transaction. We will also add support to batch multiple waiting requests by transactional ID. This newer version of the request will require CLUSTER authorization so clients will not be able to use this version.
On the client side, newer clients with the bumped produce version can read this epoch and use it for future requests.
Older clients will receive
INVALID_TXN_STATE errors when there is not an ongoing transaction for the partition. But this error is already supported.
We will add a new configuration to turn off the verification feature. This can be used by performance-conscious users who are concerned about the second AddPartitionsToTxn call to verify the partition and less concerned about hanging transactions and other transaction issues.
This configuration will not apply to the adding of the new partitions for newer clients since we do not expect performance differences there. For now the default will be true. If performance tests show a significant change, that can be reviewed again.
val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before we write transactional records to the partition"
VerificationTimeMs – number of milliseconds from adding partition info to the manager to the time the response is sent. This will include the round trip to the transaction coordinator if it is called. This will also account for verifications that fail before the coordinator is called.
VerificationFailureRate – rate of verifications that returned in failure either from the AddPartitionsToTxn response or through errors in the manager.
Bump Epoch on Each Transaction for New Clients (1)
In order to provide better guarantees around transactions, we should change semantics to ALWAYS bump the epoch upon the commit or abort of a transaction on newer clients (those with a higher produce version). This will allow us to uniquely identify a transaction with a producer ID and epoch. By being able to unique identify a transaction, we can better tell where one transaction ends and the next begins. This would cover the case where a message from a previous transaction incorrectly gets added to a new one and the hanging transaction case.
As we do now, we will ensure that any produce requests are using the correct epoch. Messages from previous transactions will be fenced because they will have an older epoch.
Upon the end of the transaction, the client sends the
EndTxnRequest. When the transaction coordinator receives this, it will write the prepare commit message with a bumped epoch and send
WriteTxnMarkerRequests with the bumped epoch. Finally, it will send the
EndTxnResponse with the bumped epoch to the client. Newer clients will read this epoch and set it as their own -- using it for the next transaction.
Return Error for Non-Zero Sequence on New Producers
For new clients, we can once again return an error for sequences that are non-zero when there is no producer state present on the server. This will indicate we missed the 0 sequence and we don't yet want to write to the log. Previously this error was
UNKNOWN_PRODUCER_ID but historically, this error code was handled in a complicated way. That's why we propose introducing the new code
ABORTABLE_ERROR that new clients handle by aborting the transaction. We already have a error message field to indicate more specifically the error.
Implicitly Add Partitions to Transactions on Produce for New Clients (2)
Each broker contains cached entries about producer state that contain a field for the first offset of a transaction
currentTxnFirstOffset. When we send a produce request with a newer version, we can check if we have a transaction
Ongoing by checking if this field is populated. If it is there, continue handling the produce request.
If there is not any state in the broker for the partition, this is the first time producing to the partition for this transaction and we need to add it to the transaction implicitly. We will put the produce request in purgatory and send a request to the transaction coordinator to add the partition to the transaction.
We will wait for the response before continuing the produce request in case we need to abort and need to know which partitions – we don’t want to write to it before we store in the transaction manager. Upon returning from the transaction coordinator, we can set the transaction as ongoing on the leader by populating
currentTxnFirstOffset through the typical produce request handling.
This field will serve as a marker that the partition was added to the transaction and will persist across leadership changes since the information is persisted to disk. When the transaction markers are written to abort/commit the transaction this offset field is cleared.
We can also make this call for the __consumer_offsets topic when the producer's
sendOffsetsToTransaction call is sent. When the producer contacts the group coordinator via
TxnOffsetCommitRequest the coordinator broker can instead make the
AddPartitionsToTxnRequest call to the transaction coordinator to add that offset partition. This means that
AddOffsetCommitsToTxnRequest is deprecated for new clients.
Ensure Ongoing Transaction for Older Clients (3)
As mentioned above, the broker contains information about whether a transaction is ongoing through
currentTxnFirstOffset. When we send a produce older version produce request, we check if there is any state for the transaction on the broker. If it is there, we continue writing the produce request.
If there is not any state in the broker for the partition, we need to verify if a transactions is ongoing. We will put the produce request in purgatory and verify through the
AddPartitionsToTxnRequest to the transaction coordinator using the
verifyOnly flag. In the verify mode of the request, we check if the transaction is already added to the transaction coordinator for the given partition. If we do not already see the transaction during verify mode we can return
INVALID_TXN_STATE for that partition. If the partition and transaction are there, we can return with no error.
We need to make sure the transaction is
Ongoing and the partition is part of the transaction. If not and we returned
INVALID_TXN_STATE from the addPartitions call, return that error in the produce response. Upon verifying this, we can continue handling the produce request that puts
currentTxnFirstOffset in the cache. In memory state can also be stored to indicate the partition has been verified. We will check this in memory state before the record is written to avoid any race where an control marker is written after sending the verify response but before the record is written.
All future produce requests for this transaction can be verified without the request to the transaction coordinator. The
currentTxnFirstOffset will serve as a marker that the validation completed and will persist across leadership changes since the information is persisted to disk. When the transaction markers are written to abort/commit the transaction this offset field is cleared.
One thing to consider here is the concurrency and potential race conditions this solution may see. One potential issue is for an abort marker to be inflight and/or written in between when the verification response comes in and the produce request written to the log. On the
WriteTxnMarkerRequest, we update the state while the log is locked and the marker is written. Then, in the callback when we try to handle the produce in the log, we can check the state one more time to make sure no marker came in (and ended the transaction) before we wrote to it.
Compatibility, Deprecation, and Migration Plan
All new clients will use the new produce version (with compatible brokers) and will see the benefits of epoch bumps and implicit adding to the partition for each transaction.
New clients also need to be updated to remove
addPartitionsToTxns in order to reap the performance benefit.
New clients will also take advantage of clearly defined retriable and abortable errors. These changes will apply to both Produce and TxnOffsetCommit requests
For the current Java client, we would return the already existing
INVALID_TXN_STATE error with a message indicating the transaction was not ongoing. This error will result in the batch failing, aborting the transaction and the sequence number being adjusted as to not cause issues with out of order sequence. Non-Java clients should also have this handling, but if not a comparable approach can be used. Some clients treat this error as fatal, and that is preferable to writing the state. In many cases, the producer issuing this request is a zombie and abortable/fatal distinction should not matter.
We also have a few more error codes that will be returned due to the interaction with the coordinator. Those are
COORDINATOR_NOT_AVAILABLE , and
The first error is abortable on the Java client and fatal on librdkafka and perhaps other clients. This should be ok since in most cases, the request is from a zombie or misconfigured client. The others are trickier since the Java client treats all retriable errors as retriable, but not all clients do. In order to avoid fatal errors as a result of coordinator changes and concurrent transactions, we opt to use
NOT_ENOUGH_REPLICAS with a specific error message for debugging. There are trade-offs for old client cases since we can't do the optimal choice which is to have a error that means to retry and an error that means to abort.
There are slightly different semantics for TxnOffsetCommits and how they handle errors.
INVALID_PID_MAPPING are fatal for this request. Instead, we will use
UNKNOWN_MEMBER_ID which is abortable. All the retriable errors will be converted to
NOT_ENOUGH_REPLICAS is not handled. These will not contain the unique error message since the api does not support that.
Unit/Integration testing will be done to test the various hanging transaction scenarios described above. Tests will also be done to ensure client compatibility between various versions.
Use a more specific error that works for older Java clients but may be fatal on other older clients
KIP-890 Part 1 tries to address hanging transactions on old clients. Thus, the produce version can not be bumped and no new errors can be added. The older java client has a notion of retriable and abortable errors -- retriable errors are defined as such by extending the retriable error class, fatal errors are defined explicitly, and abortable errors are the remaining. However, many other clients treat non specified errors as fatal and that means many retriable errors kill the application. Stuck between having specific errors for Java clients that are handled correctly (ie we retry) or specific fatal errors for cases that should not be fatal, we opted for a middle ground of non-specific error, but a message in the response to specify.
InvalidRecord Error Returned When No Ongoing Transactions for Older Clients
INVALID_RECORD was a proposed error to send when the partition was not part of the transaction since it was already returned by the producer and treated as an abortable error. This seemed like a good choice for older clients. However, this error has not existed as long as transactions have existed, so some clients may not parse this error. Therefore,
INVALID_TXN_STATE which is also an abortable error should be used instead.
Overhaul Transactions Protocol + Begin Transactions
Part of the reason why hanging transactions are an issue is that the client must explicitly add partitions to a transaction. As part of removing the AddPartitionsToTxn request, there was some consideration to redoing the protocol from scratch and introducing a unique transactional ID per transaction. This could remove the reliance on the producer ID + epoch.
BeginTxn – send to the txn coordinator to signal the beginning of a transaction, producer is given a unique transactional id
Produce – we send produce requests with the given transactional ID to the leaders and implicitly add them to the transaction.
EndTxn – send to the txn coordinator to signal the end of the transaction – commit markers are written to all the leaders, transactional id is fenced.
Though there may be some reason to remove reliance on producer IDs (in case we want to make changes there in the future), the immediate need was not apparent.
Begin Transactions Marker
Building off of the bumping epoch on each transaction approach, we could also try to identify unique transactions by including a begin transactions marker. This would accomplish a few goals:
Clearly mark in the log the start of a transaction which could aid in debugging
Persist the information about the start of a transaction locally – we wouldn’t have to send the data to new leaders since it would be contained in the log.
Supplement the epoch bump approach (or potentially implement without epoch bumps) so that the transaction coordinator and leader can guard against hanging transactions/records from a previous transaction being added to a new one.
Using the existing WriteTxnMarkerRequest pipeline means this won’t be a huge overhaul – though we may want to make the call synchronous to avoid the “concurrent transactions” issue we see when writing the end marker (ie, some time to propagate that requires retries – the same issues we saw with the coordinator → leader path)
There are some cons to this approach which led us to ultimately decided not to take this path, at least in v1:
It requires changing the record format, which may not be too difficult
We still need to send the transaction state to the leader to write the marker (and persist it to all the replicas) which takes some time
We could still implement this approach at a later date if we implement the proposed current solution.
Return LogStart and LogEndOffsets in WriteTxnMarkers Request
The idea is that this would provide more information for debugging. However, the information is found in ProducerStateManager and hard to propagate up to the response – especially since this request is handled as and append to the log. Any changes to those methods would also affect the produce path.
Instead, perhaps we could include a log message with this information from the broker side.
AddPartitionsToTxn Optimization for Older Clients
We could introduce an optimization to store the state on the leader on the addPartitionsToTxn request (issuing the a new api from the TxnCoordinator to the leader). This would make the first request after adding the partition faster since we may already have the state and can produce without an extra API call. However, in the case where the message is too slow, we would need to keep retrying until the information came in or we timeout (since the transaction is not ongoing and such state will never arrive). Failing fast is pretty important, so we decided against this approach.
DescribeTransactions call for Older Clients
Originally the plan was to use the DescribeTransactions all to check if a transaction was ongoing. However, the logic to send from the leader is already pretty complicated, so it will be easier to handle a single request type.