Status

Current state: "Accepted"

Discussion thread: here 

JIRA: KAFKA-16780

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

  1. When the consumer enables isolation_level as READ_COMMITTED and reads a topic with no aborted/committed transactions, then the broker has to traverse all the local log segments to collect the aborted transactions in worst case since there won't be any entry in the transaction index. The search stops once it finds a segment with last-stable-offset higher than the FETCH request upper-boundary-offset.
  2. The same logic is applied while reading the data from remote storage. In this case, when the FETCH request is reading data from the first remote log segment, then it has to fetch the offset, time, and transaction indexes of all the remaining remote-log segments, and then the call lands to the local-log segments before responding to the FETCH request which increases the time taken to serve the requests.

The EoS Abort Index design doc explains how the transaction index file filters out the aborted transaction records.

The issue is when consumers are enabled with the READ_COMMITTED isolation level but read topics with no aborted/committed transactions. If the topic is enabled with the transaction, then we expect the transaction to either commit/rollback within 15 minutes (default transaction.max.timeout.ms = 15 mins), possibly we may have to search only a few remote log segments to collect the aborted txns. 

The proposal is to add txnIdxEmpty field to RemoteLogSegmentMetadata event to denote whether transaction index exist while uploading the log segment to remote storage. Having this field help us to reduce the number of calls to remote storage. Note that the remote storage is agnostic to the clients and we cannot control the behavior of the clients. 

Public Interfaces

  1. A new tagged field txnIdxEmpty will be added to the RemoteLogSegmentMetadataRecord.json and RemoteLogSegmentMetadataSnapshotRecord.json that denotes whether the transaction index is empty while uploading the segment. The change needs to be backward compatible as there can be old events in the __remote_log_metadata topic. We will be adding this field as tagged and maintain the same version number 0.
RemoteLogSegmentMetadataRecord.json
 {
      "name": "TxnIndexEmpty",
      "type": "bool",
      "versions": "0+",
      "about": "Flag to indicate if the transaction index is empty.",
      "taggedVersions": "0+",
      "tag": 0
    }

The new field will be added to RemoteLogSegmentMetadata and RemoteLogSegmentMetadataSnapshot classes. The constructor/getter/hashcode/equals/toString methods will be updated to take the additional txnIdxEmpty parameter.


2. A new nextSegmentWithTxnIndex will be introduced in RemoteLogMetadataManager, the plugin implementors can optimize their implementation to return the next segment that contains the transaction index for a given epoch and offset. 

RemoteLogMetadataManager
  /**
    * Returns the next segment metadata that contains the aborted transaction entries for the given topic partition, epoch and offset.
    * <ul>
    *     <li>The default implementation returns the segment metadata that matches the given epoch and offset
    *     irrespective of the presence of the transaction index.</li>
    *     <li>The custom implementation can optimize by returning the next segment metadata that contains the txn index
    *     in the given epoch. If there are no segments with txn index in the given epoch, then return empty.</li>
    * </ul>
    * @param topicIdPartition topic partition to search for.
    * @param epoch leader epoch for the given offset.
    * @param offset offset
    * @return The next segment metadata. The transaction index may or may not exist in the returned segment metadata 
    * which depends on the RLMM plugin implementation. The caller of this method handles for both the cases.
    * @throws RemoteStorageException if there are any storage related errors occurred.
    */ 
   default Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
                                                                      int epoch,
                                                                      long offset) throws RemoteStorageException {
     return remoteLogSegmentMetadata(topicIdPartition, epoch, offset);
   }


 3. A new isEmpty method will be added to transaction index to determine whether there are no aborted transaction index entries for a segment. Note that the transaction index does not exists for a segment when there are no aborted transaction entries. We can use the transaction index file `null` check, but avoiding it as the implementation can change later.

boolean isTxnIdxEmpty = segment.txnIndex().file() == null
TransactionIndex
public boolean isEmpty() {
    return !iterable().iterator().hasNext();
}


Proposed Changes

We propose to add a new field in the remote log metadata event to denote whether the transaction index is empty or not for a given remote log segment. This reduces the number of calls to remote storage when consumers are doing backfill with isolation-level set to READ_COMMITTED. Note that while fetching the transaction index from remote storage, we are using the RemoteIndexCache, which fetches all the three indexes: offset, time, and transaction index. 

When read_committed isolation level is enabled in the consumer, then the broker scans the aborted transaction entries upto the upper-boundary-offset. The upper-boundary-offset can be either of max-offset for the FETCH request or the next-segment base offset. When there are no aborted transaction entries in the txn-index, then the broker scans for all the txn-index files and returns the empty aborted transaction list in the FETCH response.

While reading from remote, we may have to traverse most of the remote log segment metadata events. If the txnIdxEmpty is set to false for a segment, then we fetch the auxiliary indexes (offset, time, and txn index) from the remote storage for that segment. If it is true, then we skip fetching those indexes from remote. Once the search traverses all the segments (or) hits the upper-boundary-offset, then the collected aborted transaction entries will be returned in the FETCH response.

The change is backward compatible for the events that are saved in the __remote_log_metadata topic.

Compatibility, Deprecation, and Migration Plan

The existing clusters with Tiered Storage metadata will continue to work during/after a rolling upgrade to a version which contains this new change.

Test Plan

  1. A test will be added to assert the number of calls made to Remote Storage Manager to fetch the indexes. This test would have a higher number of calls earlier vs. after this KIP. 
  2. Another test to assert the that we don't read metadata for all segments (i.e. it is not a linear search) from the Topic based RLMM implementation.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels