You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We have seen hanging transactions in Kafka where the last stable offset (LSO) does not update, we can’t clean the log (if the topic is compacted), and read_committed consumers get stuck.

This can happen when a message gets stuck or delayed due to networking issues or a network partition, the transaction aborts, and then the delayed message finally comes in. The delayed message case can also violate EOS if the delayed message comes in after the next addPartitionsToTxn request comes in. Effectively we may see a message from a previous (aborted) transaction become part of the next transaction.

Another way hanging transactions can occur is that a client is buggy and may somehow try to write to a partition before it adds the partition to the transaction. In both of these cases, we want the server to have some control to prevent these incorrect records from being written and either causing hanging transactions or violating Exactly once semantics (EOS) by including records in the wrong transaction.

The best way to avoid this issue is to:

  1. Uniquely identify transactions by bumping the producer epoch after every commit/abort marker. That way, each transaction can be identified by (producer id, epoch). 

  2. Remove the addPartitionsToTxn call and implicitly just add partitions to the transaction on the first produce request during a transaction.

We avoid the late arrival case because the transaction is uniquely identified and fenced AND we avoid the buggy client case because we remove the need for the client to explicitly add partitions to begin the transaction.

Of course, 1 and 2 require client-side changes, so for older clients, those approaches won’t apply.

3. To cover older clients, we will ensure a transaction is ongoing before we write to a transaction. We can do this by querying the transaction coordinator and caching the result.

Public Interfaces

We will bump the ProduceRequest/Response version to indicate the client is using the new protocol that doesn’t require adding partitions to transactions and will implicitly do so.

Additionally, we will update the END_TXN_RESPONSE to include a bumped epoch for clients to use. We will also bump the request/response version.

{
  "apiKey": 26,
  "type": "response",
  "name": "EndTxnResponse",
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  //
  // Version 2 adds the support for new error code PRODUCER_FENCED.
  //
  // Version 3 enables flexible versions.
  //
  // Version 4 returns the producer epoch.
  "validVersions": "0-4",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "4+",
      "about": "The current epoch associated with the producer." },  
  ]
}


On the client side, newer clients with the bumped produce version can read this epoch and use it for future requests.

Older clients will receive INVALID_RECORD errors when there is not an ongoing transaction for the partition. But this error is already supported.

Proposed Changes

Bump Epoch on Each Transaction for New Clients (1)

In order to provide better guarantees around transactions, we should change semantics to ALWAYS bump the epoch upon the commit or abort of a transaction on newer clients (those with a higher produce version). This will allow us to uniquely identify a transaction with a producer ID and epoch. By being able to unique identify a transaction, we can better tell where one transaction ends and the next begins. This would cover the case where a message from a previous transaction incorrectly gets added to a new one and the hanging transaction case.

As we do now, we will ensure that any produce requests are using the correct epoch. Messages from previous transactions will be fenced because they will have an older epoch.

Upon the end of the transaction, the client sends the EndTxnRequest. When the transaction coordinator receives this, it will write the prepare commit message with a bumped epoch and send WriteTxnMarkerRequests with the bumped epoch. Finally, it will send the EndTxnResponse with the bumped epoch to the client. Newer clients will read this epoch and set it as their own -- using it for the next transaction.

Return Error for Non-Zero Sequence on New Producers

For new clients, we can once again return an error UNKNOWN_PRODUCER_ID for sequences that are non-zero when there is no producer state present on the server. This will indicate we missed the 0 sequence and we don't yet want to write to the log.

Implicitly Add Partitions to Transactions on Produce for New Clients (2)

Each broker contains cached entries about producer state that contain a field for the first offset of a transaction currentTxnFirstOffset. When we send a produce request with a newer version, we can check if we have a transaction Ongoing by checking if this field is populated. If it is there, continue handling the produce request.

If there is not any state in the broker for the partition, this is the first time producing to the partition for this transaction and we need to add it to the transaction implicitly. We will put the produce request in purgatory and send a request to the transaction coordinator to add the partition to the transaction.

This request will be synchronous and we will wait for the response before continuing the produce request in case we need to abort and need to know which partitions – we don’t want to write to it before we store in the transaction manager. Upon returning from the transaction coordinator, we can set the transaction as ongoing on the leader by populating currentTxnFirstOffset through the typical produce request handling.

This field will serve as a marker that the partition was added to the transaction and will persist across leadership changes since the information is persisted to disk. When the transaction markers are written to abort/commit the transaction this offset field is cleared.

Ensure Ongoing Transaction for Older Clients (3)

As mentioned above, the broker contains information about whether a transaction is ongoing through currentTxnFirstOffset. When we send a produce older version produce request, we check if there is any state for the transaction on the broker. If it is there, we continue writing the produce request.

If there is not any state in the broker for the partition, we need to verify if a transactions is ongoing. We will put the produce request in purgatory and verify through the DescribeTransactionRequest to the transaction coordinator. This response contains the transaction state which can be: Empty, Ongoing, PrepareCommit, PrepareAbort, CompleteCommit, CompleteAbort, Dead, PrepareEpochFence.

We need to make sure the transaction is Ongoing and the partition is part of the transaction. If not, return INVALID_RECORD. This means the call is synchronous in the produce request. Upon verifying this, we can continue handling the produce request that puts currentTxnFirstOffset in the cache.

All future produce requests for this transaction can be verified without the request to the transaction coordinator. The currentTxnFirstOffset will serve as a marker that the validation completed and will persist across leadership changes since the information is persisted to disk. When the transaction markers are written to abort/commit the transaction this offset field is cleared.

Race Condition

One thing to consider here is the concurrency and potential race conditions this solution may see. One potential issue is for an abort marker to be inflight and/or written in between when the describe response comes in and the produce request written to the log. On the WriteTxnMarkerRequest, we update the state while the log is locked and the marker is written. Then, in the callback when we try to handle the produce in the log, we can check the state one more time to make sure no marker came in (and ended the transaction) before we wrote to it.

Compatibility, Deprecation, and Migration Plan

New Clients

All new clients will use the new produce version (with compatible brokers) and will see the benefits of epoch bumps and implicit adding to the partition for each transaction.

New clients also need to be updated to remove addPartitionsToTxns in order to reap the performance benefit.

Old Clients

For the current Java client, we would return the already existing INVALID_RECORD error with a message indicating the transaction was not ongoing. This error will result in the batch failing and the sequence number being adjusted as to not cause issues with out of order sequence. Non-Java clients should also have this handling, but if not a comparable approach can be used.

Test Plan

Unit/Integration testing will be done to test the various hanging transaction scenarios described above. Tests will also be done to ensure client compatibility between various versions.

Rejected Alternatives

Overhaul Transactions Protocol + Begin Transactions

Part of the reason why hanging transactions are an issue is that the client must explicitly add partitions to a transaction. As part of removing the AddPartitionsToTxn request, there was some consideration to redoing the protocol from scratch and introducing a unique transactional ID per transaction. This could remove the reliance on the producer ID + epoch.

  1. BeginTxn – send to the txn coordinator to signal the beginning of a transaction, producer is given a unique transactional id

  2. Produce – we send produce requests with the given transactional ID to the leaders and implicitly add them to the transaction.

  3. EndTxn – send to the txn coordinator to signal the end of the transaction – commit markers are written to all the leaders, transactional id is fenced.

Though there may be some reason to remove reliance on producer IDs (in case we want to make changes there in the future), the immediate need was not apparent.

Begin Transactions Marker

Building off of the bumping epoch on each transaction approach, we could also try to identify unique transactions by including a begin transactions marker. This would accomplish a few goals:

  1. Clearly mark in the log the start of a transaction which could aid in debugging

  2. Persist the information about the start of a transaction locally – we wouldn’t have to send the data to new leaders since it would be contained in the log.

  3. Supplement the epoch bump approach (or potentially implement without epoch bumps) so that the transaction coordinator and leader can guard against hanging transactions/records from a previous transaction being added to a new one.

  4. Using the existing WriteTxnMarkerRequest pipeline means this won’t be a huge overhaul – though we may want to make the call synchronous to avoid the “concurrent transactions” issue we see when writing the end marker (ie, some time to propagate that requires retries – the same issues we saw with the coordinator → leader path)

There are some cons to this approach which led us to ultimately decided not to take this path, at least in v1:

  1. It requires changing the record format, which may not be too difficult

  2. We still need to send the transaction state to the leader to write the marker (and persist it to all the replicas) which takes some time

  3. We could still implement this approach at a later date if we implement the proposed current solution.

Return LogStart and LogEndOffsets in WriteTxnMarkers Request

The idea is that this would provide more information for debugging. However, the information is found in ProducerStateManager and hard to propagate up to the response – especially since this request is handled as and append to the log. Any changes to those methods would also affect the produce path.

Instead, perhaps we could include a log message with this information from the broker side.

AddPartitionsToTxn Optimization for Older Clients

We could introduce an optimization to store the state on the leader on the addPartitionsToTxn request (issuing the a new api from the TxnCoordinator to the leader). This would make the first request after adding the partition faster since we may already have the state and can produce without an extra API call. However, in the case where the message is too slow, we would need to keep retrying until the information came in or we timeout (since the transaction is not ongoing and such state will never arrive). Failing fast is pretty important, so we decided against this approach.

  • No labels