Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. LSO (Last Stable Offset) marks the end of the last committed transaction.  The client should not see messages beyond LSO.  When a leader broker generates FetchResponse normally it will make sure it will not send the messages beyond LSO.  When the consumer is reading directly from server logs from remote storage, it needs to stop at the LSO boundary. Fortunately LSO field is present in FetchResponseData.PartitionData so the consumer can use this information to stop the message consumption;
  2. Aborted Transactions contains the message offsets for all the messages in an aborted transaction, the consumer client will need to filter out those messages before it can be seen by the user;  There are two possible implementation ideas (and we favor option b: broker retrieve the aborted transaction content):
    1. AbortedTransactions are stored in TransactionIndex file and working together with ProducerSnapshot index file, both indexes are stored on broker side as well as on remote tiered storage side.  The leader broker can return the location of those two index files back to the consumer client and consumer will download both of them from remote storage to local and then do the filtering accordingly;
    2. Broker can retrieve the aborted transaction from remote tiered storage and then set it in FetchResponse.PartitionData structure.  Although this is an extra downloading on broker side but since this is a small index file normally (since transaction abort should be a rare case), downloading that index should not cause too much performance issue on the broker side and this can greatly reduce the complexity of consumer code;

Handle Queue

We are not sure whether queue (shared group) is already supported on tiered storage, but if we need to support it here are some thoughts:

The exchange of messages is now between ShareConsumer and broker through ShareFetchRequest/ShareFetchResponse.  We can replicate the design from Consumer/Broker through FetchRequest/FetchResponse into the ShareFetch message flow.

Status: Share groups are already supported with broker-mediated remote fetches.

Share groups (queues) work with tiered storage today via the broker's existing remote fetch mechanism. When a ShareFetchRequest requires data from remote storage, the broker fetches the segment via RemoteLogManager, manages record distribution through SharePartition.acquire(), and returns records to consumers via ShareFetchResponse. This flow is implemented via DelayedShareFetch and PendingRemoteFetches.

Direct consumer fetch (the focus of this KIP) does not apply to share groups. The share partition leader must maintain control of record distribution, acquisition state, timeouts, and acknowledgments. This coordination requires the broker to read records before assigning them to specific consumers, making metadata-only responses (segment location without records) incompatible with the share group model.

For ShareFetchRequest/ShareFetchResponse, the protocol changes in this KIP (RemoteLogSegmentLocationRequested field) are not applicable. Share groups continue using broker-mediated fetches as they do todayHowever, the broker is also acting as share partition leader and needs to manage and distribute the inflight messages (the messages between SPSO and SPEO) to share consumers.  If we need to maintain this logic, we would need to nominate one of the consumers to act as share partition leader.

Consumer read a subset of messages

...