Status

Current stateDraft

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka transactions are often used in combination with stream processing engines such as Apache Flink and Apache Spark, as well as with Kafka's native stream processing library, Kafka Streams. Users have highlighted issues ( KAFKA-20381 - Getting issue details... STATUS and KAFKA-19873 - Getting issue details... STATUS , for example) with how the transaction timeout is responsible for two potentially conflicting requirements. First, we want to be able to ensure that the client applications performing the transactions are still alive, and second, we want to ensure that the transactions have sufficient time to complete. If users set the transaction timeout to be low, transactions are rolled back promptly when the applications fail, but there is a risk that transactions are aborted preemptively because there was not quite enough time. Conversely, if users set the transaction timeout to be high, transactions have sufficient time to complete, but the delay before a failing application's transaction is aborted is also very long.

This KIP proposes to introduce an explicit liveness check for transactions so that users can benefit from prompt recovery from failed applications when they need to allow the transaction timeout to be long.

Proposed Changes


Public Interfaces

Kafka Protocol Changes

This KIP introduces the following new API:

  • TxnHeartbeat

Access Control

This table gives the ACLs required for the new API:

RPCOperationResource
TxnHeartbeat WRITETransactional ID

InitProducerId API

The KIP introduces version 7 (or version 6 if this KIP is adopted before KIP-939's APIs are marked as stable).

Request schema

{
  "apiKey": 22,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitProducerIdRequest",
  // Version 1 is the same as version 0.
  //
  // Version 2 is the first flexible version.
  //
  // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
  //
  // Version 4 adds the support for new error code PRODUCER_FENCED.
  //
  // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
  //
  // Version 6 adds support for 2PC (KIP-939).
  //
  // Version 7 adds support for transaction heartbeats (KIP-1309).
  "validVersions": "0-7",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",
      "about": "The transactional id, or null if the producer is not transactional." },
    { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
      "about": "The time in ms to wait before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." },
    { "name": "TransactionHeartbeatIntervalMs", "type": "int32", "versions": "7+", "default": "0", "ignorable": "true",
      "about": "The time in ms between heartbeats from the producer. If >0, idle transactions sent by this producer are aborted after 3 times this interval." },
    { "name": "ProducerId", "type": "int64", "versions": "3+", "default": "-1", "entityType": "producerId",
      "about": "The producer id. This is used to disambiguate requests if a transactional id is reused following its expiration." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "3+", "default": "-1",
      "about": "The producer's current epoch. This will be checked against the producer epoch on the broker, and the request will return an error if they do not match." },
    { "name": "Enable2Pc", "type": "bool", "versions": "6+", "default": "false",
      "about": "True if the client wants to enable two-phase commit (2PC) protocol for transactions." },
    { "name": "KeepPreparedTxn", "type": "bool", "versions": "6+", "default": "false",
      "about": "True if the client wants to keep the currently ongoing transaction instead of aborting it." }

  ]
}

This KIP adds the new field TransactionalHeartbeatIntervalMs .

Response schema

Version 7 is the same as version 6.

If TransactionHeartbeatIntervalMs  is non-zero and Enable2Pc  is true, the error code is INVALID_REQUEST .

If TransactionHeartbeatIntervalMs  is non-zero and greater than TransactionTimeoutMs , the error code is INVALID_REQUEST .

TxnHeartbeat API

The TxnHeartbeat API is used by transactional producers to demonstrate their liveness to the transaction coordinator. They are only sent by transactional producers when heartbeats are enabled and when two-phase commit is not being used, typically because it is desirable to detect producer failures more quickly than the transaction timeout.

Request schema

{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "TxnHeartbeatRequest",
  // Version 0 is the first version (KIP-1309).
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
      "about": "The transactional id." },
    { "name": "ProducerId", "type": "int64", "versions": "0+", "default": "-1", "entityType": "producerId",
      "about": "The producer id. This is used to disambiguate requests if a transactional id is reused following its expiration." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+", "default": "-1",
      "about": "The producer's current epoch. This will be checked against the producer epoch on the broker, and the request will return an error if they do not match." },
  ]
}

Response schema

{
  "apiKey": TBD,
  "type": "response",
  "name": "TxnHeartbeatResponse",
  // Version 0 is the first version (KIP-1309).
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - TRANSACTIONAL_ID_AUTHORIZATION_FAILED (version 0+)
  // - TRANSACTIONAL_ID_NOT_FOUND (version 0+)
  // - NOT_COORDINATOR (version 0+) 
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 1+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The error message, or null if there was no error." }
  ]
}

Maybe need to include TRANSACTION_ABORTABLE  and UNKNOWN_PRODUCER_ID error codes here.

Configuration

Producer configuration

The following configuration property is added for producers.

ConfigurationDescriptionValues
transaction.heartbeat.interval.ms The expected time between heartbeats to the transaction coordinator. Heartbeats can be used to ensure that a transactional producer remains alive and it is desirable to ensure liveness more frequently than the transaction.timeout.ms . When enabled, a transactional producer will send heartbeats at this interval to ensure that the transaction remains open before the coordinator proactively aborts it. If the transaction coordinator does not receive a heartbeat after 3 times this interval, it will proactively abort the transaction.Default: 0 (disabled)

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels