Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: In DiscussionAccepted

Discussion thread: https://lists.apache.org/thread/895pgb85l08g2l63k99cw5dt2qpjkxb9 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14305

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
{
  "apiKey": TBD
  "type": "metadata",
  "name": "BeginMarkerRecordBeginTransactionRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {"name": "Name", "type": "string", "versions": "0+", 
     "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
     "about": "An optional textual description of this transaction. Should not exceed 255 bytes"}
  ]
}


Code Block
{
  "apiKey": TBD
  "type": "metadata",
  "name": "EndMarkerRecordEndTransactionRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": []
}

...

Code Block
{
  "apiKey": TBD
  "type": "metadata",
  "name": "AbortMarkerRecordAbortTransactionRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {"name": "Reason", "type": "string", "versions": "0+", 
     "nullableVersions": "0+", "taggedVersions": "0+", "tag": 0,
     "about": "An optional textual explanation of why this transaction was aborted. Should not exceed 255 bytes"}
  ]
}


A new metadata.version will be introduced to gate the usage of these new records. 

...

We define a new concept in the controller called a metadata transaction. A transaction consists of a BeginMarkerBeginTransaction, a number of records, and either an EndMarker EndTransaction or AbortMarkerAbortTransaction. These records can be committed atomically to the Raft layer in multiple batches. By allowing a set of records to span across several record batches, we overcome the current batch size limitation and further decouple the controller from Raft layer implementation details.

...

RecordRecord in transactionRecord in batch
BeginMarkerRecordBeginTransactionRecord11

TopicRecord

22
PartitionRecord33
PartitionRecord44
PartitionRecord51
PartitionRecord6

2

ConfigRecord

73
EndMarkerRecordEndTransactionRecord8

4

Record Visibility

As the controller sends records to Raft, it will optimistically apply the record to its in-memory state. The record is not considered durable (i.e., "committed") until the high watermark of the Raft log increases beyond the offset of the record. Once a record is durable, it is made visible outside the controller to external clients. Since we are defining transactions that may span across Raft commits, we must extend our criteria for visibility to include whole transactions. When the controller sees that a partial transaction has been committed, it must not expose the records until the entire transaction is committed. This is how the controller will provide atomicity for a transaction.

For the broker, we must also prevent partial and aborted transactions from becoming visible to broker components. During normal operation, the records fetched by the brokers will include everything up to the Raft high watermark. This will include partial transactions. The broker must buffer these records and not make them visible to its internal components. This buffering can be achieved using the existing metadata infrastructure on the broker. That is, we will apply records to MetadataDelta, but not publish a new MetadataImage until an EndTransaction or AbortTransaction is seen. 

The metadata shell must also avoid showing partially committed metadata transactions.

Controller Failover

Another aspect of atomicity for transactions is dealing with controller failover. If a transaction has not been completed when a controller crashes, there will be a partial transaction committed to the log. The remaining records for this partial transaction are forever lost, and so we must abort the entire transaction. This will be done by the newly elected controller inserting an AbortMarkerAbortTransaction.

When the controller aborts a transaction, it will reset its in-memory state to the point before the transaction began. This effectively rolls back the applied records from the partially committed transaction. Because of the visibility guarantees mentioned in the previous section, there is no risk of these applied records being exposed to external clients.  

Since we only allow for a single transaction at a time, detecting a partial transaction during failover is easily determined. An incomplete transaction would only occur at the end of the log. Using the above example, an incomplete transaction might look like:

  • BeginMarkerRecordBeginTransactionRecord
  • TopicRecord
  • PartitionRecord
  • PartitionRecord

There will be control records in the Raft log after this partial transaction (e.g., records pertaining to Raft election), but these are not exposed to the controller directly. To abort this transaction, the controller would write a AbortMarkerRecord AbortTransactionRecord to the log and revert its in-memory state.

Broker Support

The records fetched by the brokers will include everything up to the Raft high watermark. This will include partial transactions. The broker must buffer these records and not make them visible to its internal components. Buffering will be done using existing metadata infrastructure on the broker. I.e., we will continue applying records to MetadataDelta, but will not publish a new MetadataImage until an EndMarker or AbortMarker is seen. 

Snapshots

Since snapshots are already atomic as a whole, there are no need for transaction records. On both the broker and controller, we generate snapshots based on in-memory state using arbitrary batch sizes. This design does not call for any special in-memory treatment of metadata transactions – they are simply a mechanism of atomicity to use within the log itself. Records applied to memory will not know if they came from a non-atomic batch, atomic batch, or transaction. Snapshot generation is unchanged by this KIP.

...

These new records will be incompatible with older versions of the broker and controller, and so we will gate their usage by introducing a new metadata.version. If the metadata log is downgraded to a version that does not support metadata transactions, we will simply exclude them from the downgrade snapshot that is produced. Note that lossy downgrades of the metadata log are detailed in KIP-778 and not yet fully implemented as of Kafka 3.3.

...