Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: read_committed isolation level

...

When fetching records from a compacted topic, it is possible that record batches fetched have offset gaps which correspond to records the log cleaner removed. This simple results in gaps of the range of offsets of the in-flight records.

Reading transactional records

Each consumer in a consumer group has its own isolation level which controls how it handles records which were produced in transactions. For a share group, the concept of isolation level applies to the entire group, not each consumer.

The isolation level of a share group is controlled by the group configuration group.share.isolation.level.

For the read_uncommitted isolation level, which is the default, the share group consumes all transactional and non-transactional records.

For the read_committed isolation level, the share group only consumes committed records. The share-partition leader itself is responsible for keeping track of the commit and abort markers and filtering out transactional records which have been aborted. So, the set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPEO can only move up to the last stable offset.

This processing has to occur on the broker because none of the clients receives all of the records. It can be performed with shallow iteration of the log.

In-flight records example

...

When a batch of records is first read from the log and added to the in-flight records for a share-partition, the broker does not know whether the set of records between the batch’s base offset and the last offset contains any gaps, as might occur for example as a result of log compaction. When the broker does not know which offsets correspond to records, the batch is considered an unmaterialized record batch. Rather than forcing the broker to iterate through all of the records in all cases, which might require decompressing every batch, the broker can send unmaterialized record batches to consumers. It initially assumes that all offsets between the base offset and the last offset correspond to records. When the consumer processes the batch, it may find gaps and it reports these using the ShareFetch  or ShareAcknowledge API. This means that the presence of unmaterialized record batches containing gaps might temporarily inflate the number of in-flight records, but this will be resolved by the consumer acknowledgements.

...

The share-partition leader clearly has to look within the data returned from the replica manager in order to understand the record batches it fetches. This means that records retrieved using a share group are not able to benefit from the zero-copy optimization.

By iterating over the record batches but not iterating over the individual records within, the share-partition leader is able to understand the log without having to decompress the records. There is one exception to this and that is to do with reading the transaction end markers as described in the next section.

The share-partition leader does not maintain an explicit cache of records that it has fetched. As a result, it may occasionally have to re-fetch records for redelivery, which is an unusual case.

Reading transactional records

Each consumer in a consumer group has its own isolation level which controls how it handles records which were produced in transactions. In a consumer group, a consumer using read_committed isolation level is only able to fetch records up to the last stable offset (LSO). It is also responsible for filtering out transactional records which were aborted. This filtering happens in the client.

For a share group, the concept of isolation level applies to the entire group, not each consumer. The isolation level of a share group is controlled by the group configuration group.share.isolation.level.

For the read_uncommitted isolation level, which is the default, the share group consumes all non-transactional and transactional records.

For the read_committed isolation level, the share group only consumes non-transactional records and committed transactional records. The set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPSO cannot move past the last stable offset, so an open transaction blocks the progress of the share group with read_committed isolation level. The share-partition leader itself is responsible for keeping track of the commit and abort markers, and filtering out transactional records which have been aborted.

Let’s have a look at the structure of the log for transactional records and see how the share-partition leader processes the log to build up the in-flight records in the presence of transactions.

A log segments consists of a sequence of RecordBatch structures. They are interpreted as follows:

  • isTransactional=false is a batch of non-transactional records - these are potentially in-flight records

  • isTransactional=true, isControl=false is a batch of transactional records - these are potentially in-flight records if the transaction commits

  • isTransactional=true, isControl=true is a batch of control records - within these batches are the EndTxnMarker records which indicates the commit or abort of a transaction

So, only the last of these needs to be deeply inspected to read the records, but these batches are not compressed and they’re also very small.

The records are fetched from the replica manager using FetchIsolation.TXN_COMMITTED. This means the the replica manager only returns records up to the last stable offset, meaning that any transactional records fetched were part of transactions which have already been completed and as such have a transaction marker in the log before the LSO.

When a RecordBatch  of non-transactional records is fetched, the records are immediately added to the set of in-flight records in Available state. When a RecordBatch  of transactional records is fetched, the records are added to the set of in-flight records in a transient Uncommitted state, tagged with the producer ID and producer epoch which act as the transaction identifier. When a RecordBatch of transactional control records is fetched, the EndTxnMarker within is inspected to see whether it is committing or aborting the transaction. If the transaction is committed, the Uncommitted records for that transaction become Available. If the transaction is aborted, the Uncommitted records for that transaction are discarded.

If a transaction was long-running, it may be necessary to read a lot of records in order to discover the control record. If the number of in-flight records has reached its limit and there are any Uncommitted records, the log is scanned looking for transaction control records beyond the SPEO, building up a summary of the (producer ID, producer epoch, outcome) information that is found. This can then subsequently be used to process transactional records as they are being added to the in-flight records.

Share sessions

The ShareFetch API works very much like incremental fetch using a concept called a share session. ShareFetch API works very much like incremental fetch using a concept called a share session. Each share session contains a set of topic-partitions which are managed in the share-partition leaders. The share-partition leader manages the fetching of records and the in-flight record state for its share-partitions. The consumer adds and removes topic-partitions from its share session using the ShareFetch API just like the Fetch API is used for incremental fetch. With the Fetch API, the consumer specifies the fetch offset. With the ShareFetch API, the consumer just fetches records and the share-partition leader decides which records to return.

...

The share session provides a way to maintain context for a consumer in a share group across a sequence of ShareFetch  and ShareAcknowledge  requests. A share session is created using a ShareFetch  request, then used by ShareFetch  and ShareAcknowledge  requests, and finally closed by either a ShareFetch  or a ShareAcknowledge  request. When the connection between a client and broker is disconnected, any share session is automatically closed.

...