Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Only a single record transaction may exist at a time
  • A transaction cannot be interleaved with other records
  • All scheduled writes to Raft must be atomic, including the two marker records

For example, when creating a topic with a very high partition count, we may end up with a set of atomic commits that looks like:

Controller Failover


An example topic creation using a transaction with two atomic batches:

RecordRecord in transactionRecord in batch
BeginMarkerRecord11

TopicRecord

22
PartitionRecord33
PartitionRecord44
PartitionRecord51
PartitionRecord6

2

ConfigRecord

73
EndMarkerRecord8

4

Record Visibility

As the controller sends records to Raft, it will optimistically apply the record to its in-memory state. The record is not considered durable (i.e., "committed") until the high watermark of the Raft log increases beyond the offset of the record. Once a record is durable, it is made visible outside the controller to external clients. Since we are defining transactions that may span across Raft commits, we must extend our criteria for visibility to include whole transactions. When the controller sees that a partial transaction has been committed, it must not expose it until the entire transaction is committed. This is how the controller will provide atomicity for a transaction.

Controller Failover

Another aspect of atomicity for transactions is dealing with controller failover. If a transaction has not been completed when a controller crashes, there will be a partial transaction committed to the log. The remaining records for this partial transaction are forever lost, and so we must abort the entire transaction. This will be done by the newly elected controller inserting an AbortMarkerRecord.

Detecting a partial transaction during failover is easily determined by examining the log. An incomplete transaction If a transaction is not finalized when a controller crashes, the newly elected controller can abort it by inserting an AbortMarker. This state is easily detectable during failover by the new controller by examining its log. An incomplete batch is only expected during failover and would occur at the end of the log.

An incomplete batch would Using the above example, an incomplete transaction might look like:

  • BeginMarkerRecord
  • TopicRecord
  • PartitionRecord
  • PartitionRecord
  • BatchMarkerRecord(markerType=StartBatch)
  • Record1
  • Record2

There will be control records in the Raft log after the this partial transaction (e.g., records pertaining to Raft election), but these are not exposed to the controller as records. 

...

Broker Support

The high watermark of the Raft layer is used to control visibility of metadata updates in the controller. Since we are defining transactions that may span across Raft commits, we must not make a transaction visible to readers until the EndMarker/AbortMarker is processed.records fetched by the brokers will include everything up to the Raft high watermark. This will include partial transactions. The broker must buffer these records and not make them visible to its internal components. Once an EndMarkerRecord or AbortMarkerRecord are seen, the buffered records can be processed or discarded, respectively. 

Snapshots

Since snapshots are already atomic as a whole, there is no need for transaction records. On both the broker and controller, we generate snapshots based on in-memory state using arbitrary batch sizes. This design does not call for any special in-memory treatment of metadata transactions – they are simply a mechanism of atomicity to use within the log itself. Records applied to memory will not know if they came from a non-atomic batch, atomic batch, or transaction. 

Compatibility, Deprecation, and Migration Plan

...

These new records will be incompatible with older versions of the broker and controller, and so we will gate their usage by introducing a new metadata.version. If the metadata log is downgraded to a version that does not support metadata transactions, we will simply exclude them from the downgrade snapshot that is produced. Note that lossy downgrades of the metadata log are detailed in KIP-778 and not yet fully implemented as of Kafka 3.3.

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

...