You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state"Draft"

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KRaft, we use record batches as a mechanism for atomicity. When performing a metadata update that generates a number of related records, we will commit them atomically as a batch to the KRaft quorum. This ensures that the entire batch will be appended to the KRaft layer, or none of it will. This atomicity is very important when it comes to controller failover. If a controller commits a partial set of records and then crashes, the uncommitted records will be lost.

One limitation of the current approach is that the maximum batch size is limited by the maximum fetch size in the Raft layer. Currently, this value is hard coded to 8kb. In extreme cases, the controller may generate a set of atomic records that exceed this size. 

This KIP details an approach to allow the controller to generate atomic transactions of records that can exceed the maximum batch size.

Public Interfaces

Three simple marker records will be introduced to the KRaft metadata layer. These records will represent the beginning and end of a transaction.

{
  "apiKey": TBD
  "type": "metadata",
  "name": "BeginMarkerRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": []
}


{
  "apiKey": TBD
  "type": "metadata",
  "name": "EndMarkerRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": []
}


{
  "apiKey": TBD
  "type": "metadata",
  "name": "AbortMarkerRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": []
}



A new metadata.version will be introduced to gate the usage of these new records. 

Proposed Changes

Changes to the KRaft Controller

We define a new concept in the controller called a record transaction. A transaction consists of a BeginMarker, a number of records, and either an EndMarker or AbortMarker. These records can be committed atomically to the Raft layer in multiple batches. By allowing a set of records to span across several record batches, we overcome the current batch size limitation and further decouple the controller from Raft layer implementation details.


We can define a few invariants to help narrow the design:

  • Only a single record transaction may exist at a time
  • A transaction cannot be interleaved with other records
  • All scheduled writes to Raft must be atomic, including the two marker records


For example, when creating a topic with a very high partition count, we may end up with a set of atomic commits that looks like:


Untitled drawing.svg

Controller Failover

If a transaction is not finalized when a controller crashes, the newly elected controller can abort it by inserting an AbortMarker. This state is easily detectable during failover by the new controller by examining its log. An incomplete batch is only expected during failover and would occur at the end of the log.

An incomplete batch would look like

  • BatchMarkerRecord(markerType=StartBatch)
  • Record1
  • Record2

There will be control records in the log after the partial transaction, but these are not exposed to the controller as records.


Record Visibility

The high watermark of the Raft layer is used to control visibility of metadata updates in the controller. Since we are defining transactions that may span across Raft commits, we must not make a transaction visible to readers until the EndMarker/AbortMarker is processed.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels