Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: feedback from Colin

...

One limitation of the current approach is that the maximum batch size is limited by the maximum fetch size in the Raft layer. Currently, this value is hard coded to 8kb. In extreme cases, the controller may generate a set of atomic records that exceed this size. 

A practical use case is creating a topic with a huge number of partitions. We either want all the records (TopicRecord + PartitionRecords) to be processed by the controller and brokers, or none of them to be processed. The only way we can achieve this atomicity today is by using an atomic Raft batch. With this KIP, we can atomically create topics with an arbitrary number of partitions.

This KIP details an approach to allow the controller to generate atomic transactions of records that can exceed the maximum batch size.

...

The records fetched by the brokers will include everything up to the Raft high watermark. This will include partial transactions. The broker must buffer these records and not make them visible to its internal components. Once an EndMarker Buffering will be done using existing metadata infrastructure on the broker. I.e., we will continue applying records to MetadataDelta, but will not publish a new MetadataImage until an EndMarker or AbortMarker is seen, the buffered records can be processed or discarded, respectively

Snapshots

Since snapshots are already atomic as a whole, there are no need for transaction records. On both the broker and controller, we generate snapshots based on in-memory state using arbitrary batch sizes. This design does not call for any special in-memory treatment of metadata transactions – they are simply a mechanism of atomicity to use within the log itself. Records applied to memory will not know if they came from a non-atomic batch, atomic batch, or transaction. Snapshot generation is unchanged by this KIP.

...

An alternative approach would be to put transactions in the Raft layer. This would be possible, but likely a much larger effort with a more complex implementation. The Raft layer in Kafka does not enforce the single-writer semantics that we have on the controller, so Raft transaction support would need to deal with concurrent transactions (much like we do with EOS). This would also likely involve changing the RaftClient#Listener interface. Since the controller is the single-writer of the metadata log, we can simplify the design quite a bit by managing a single transaction in that layer.

Another disadvantage of buffering transactions in Raft is that we would have to be very "dumb" about the buffering. Since Raft has no context of the contents of the records, it cannot do anything besides simply buffering the un-finished transaction records in memory. By doing transactinos in the controller/broker, we can be much smarter and more efficient about how we buffer things.