DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Henry Cai, Thomas Thornton
Status
Current state: "Under Discussion"
Related KIP: KIP-1254
Discussion thread: here
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-405 enables Kafka brokers to upload log segments to remote tiered storage, this opens the door to many new use cases, one of them is having consumers to fetch from the tiered storage directly.
Currently, when a new consumer or a fallen-off consumer requires fetching messages from a while ago, and those messages are no longer present in the Kafka broker's local storage, the broker must download the message from the remote tiered storage and subsequently transfer the data back to the consumer. This process involves two data transfers, resulting in increased network latency and transfer costs. Critically, when a broker downloads the data segment from remote storage and stores it locally, it imposes additional IOPS on the network and local disk capacity. Furthermore, this action negatively impacts the page cache established for messages closer to the end of the message queue.
With the data segment file already present on remote tiered storage, there is no reason why the consumer cannot directly fetch them from remote storage without affecting the broker performance. To address this gap, we propose here to allow the consumers to fetch from the remote tiered storage.
This feature will also be highly beneficial for serving large backfill requests from consumer clients from data analytics and streaming applications such as Pinot or Flink.
This KIP is focused on the design on broker side to support remote tiered storage fetch from the consumer, for consumer side design please refer to KIP-1254.
Proposed Changes
Know the location of remote log segment file
In order to support fetching from the remote tiered storage, the consumer needs to be notified about the location of the remote log segment file for the fetch offset. This can be done either by (1) the consumer subscribes the tiered storage metadata topic: __remote_log_metadata, filters to the partition it interests in, parses each metadata message and tracks the state change; (2) have the leader broker notifies the consumer about the location of the remote log segment file during the consumer FetchRequest/Response message exchange. We recommend the second option: have the leader broker notifies the consumer about the location.
The reason we favor the broker/consumer location exchange is because:
- __remote_log_metaddata topic is a congested topic in tiered storage which contains metadata for all the tiered storage enabled topic partitions and the client also needs to be aware that each metadata needs to go through several state changes. The topic includes the metadata history of all the remote log segment from day 1 even if the segment file itself is already removed from tiered storage; There are lot of reading/filtering and caching needs to happen on the consumer side;
- With many programming languages to support on the consumer side and given the trend of moving complex consumer logic to the broker side, we feel it is more natural to have the broker to keep track of the remote tiered storage segment file and exchange the info with the consumer. This is similar to the design choice (whether consumer or the broker keep tracks of preferred read replica) during KIP-392: Allow consumers to fetch from closest replica design where it also favors to have the broker exchange the preferred read replica information with the consumer.
We propose to extend the Fetch API to convey the remote log segment location information.
The consumer client will initiate the FetchRequest with an extra boolean parameter: RemoteLogSegmentLocationRequested which indicates the client is able to read from remote storage and is interested to get the location of remote log segment file on remote storage. When the leader broker receives FetchRequest from the consumer and it finds out it no longer has the requested fetch offset in the local storage, it can respond with an empty Records and with the extra information about the location of the remote log segment. This is very similar to how KIP-392 responds with empty Records and preferred read replica in FetchResponse to the consumer.
Security and Access Control
The broker will perform the standard access and authorization check against the fetch request based on Kafka Access Control configuration.
However since the consumer can later read the object in Object Store from the location information in FetchResponse, some organizations might have concerns on the access in Object Store for the consumer. Most of the Object Store has its own ACL configurations (for example S3's IAM role/policy configuration) to manage specific client access. For the organizations who wants a simple way to provide another layer of protection, the broker can protect the location information with a signing mechanism provided by the cloud provider (for example, presigned URL in S3 or SAS token with Azure), this way the client will only be able to access a specific object in object store within a limited time.
If we choose this design, we will add a method getSignedRemoteLogSegmentLocation(RemoteLogSegmentId, CustomMetadata) in RemoteStorageManager interface to retrieved the signed url for the segment location from the cloud provider and add a field SignedRemoteLogSegmentLocation in the FetchResponse to return to the consumer.
Fetch and Read Remote Log segment file
Once the consumer is aware of the location of the remote log segment file, it can initiate the read from remote tiered storage and parses the content in log segment file and converts the message into ConsumerRecord objects; We will address them in the separate KIP since this work is dependent on the programming language choice on the consumer side. (Note we have published KIP-1254 for the enhancement needed on Java consumer side.)
Handle Transaction Messages
We plan to support transaction messages in the phase II of this project so we didn’t include transactional message support structure as mandatory fields in this KIP. Nonetheless here is the idea on how to support transaction messages:
If the consumer is reading with ReadUncommitted isolation level, no special filter is needed since it should be able to read all messages from server log.
If the consumer is reading with ReadCommitted isolation level, for messages involved in transactions, two special logic are needed on the consumer side to process based on transaction boundary:
- LSO (Last Stable Offset) marks the end of the last committed transaction. The client should not see messages beyond LSO. When a leader broker generates FetchResponse normally it will make sure it will not send the messages beyond LSO. When the consumer is reading directly from server logs from remote storage, it needs to stop at the LSO boundary. Fortunately LSO field is present in FetchResponseData.PartitionData so the consumer can use this information to stop the message consumption;
- Aborted Transactions contains the message offsets for all the messages in an aborted transaction, the consumer client will need to filter out those messages before it can be seen by the user; There are two possible implementation ideas (and we favor option b: broker retrieve the aborted transaction content):
- AbortedTransactions are stored in TransactionIndex file and working together with ProducerSnapshot index file, both indexes are stored on broker side as well as on remote tiered storage side. The leader broker can return the location of those two index files back to the consumer client and consumer will download both of them from remote storage to local and then do the filtering accordingly;
- Broker can retrieve the aborted transaction from remote tiered storage and then set it in FetchResponse.PartitionData structure. Although this is an extra downloading on broker side but since this is a small index file normally (since transaction abort should be a rare case), downloading that index should not cause too much performance issue on the broker side and this can greatly reduce the complexity of consumer code;
Handle Queue
Status: Share groups are already supported with broker-mediated remote fetches.
Share groups (queues) work with tiered storage today via the broker's existing remote fetch mechanism. When a ShareFetchRequest requires data from remote storage, the broker fetches the segment via RemoteLogManager, manages record distribution through SharePartition.acquire(), and returns records to consumers via ShareFetchResponse. This flow is implemented via DelayedShareFetch and PendingRemoteFetches.
Direct consumer fetch (the focus of this KIP) does not apply to share groups. The share partition leader must maintain control of record distribution, acquisition state, timeouts, and acknowledgments. This coordination requires the broker to read records before assigning them to specific consumers, making metadata-only responses (segment location without records) incompatible with the share group model.
For ShareFetchRequest/ShareFetchResponse, the protocol changes in this KIP (RemoteLogSegmentLocationRequested field) are not applicable. Share groups continue using broker-mediated fetches as they do today.
Consumer read a subset of messages
Sometimes the consumer might only want to read a subset of messages within an offset range. Kafka log segment file can be very big (on the order of GB), having the consumer read the whole GB long segment and then find the offset range might not be performant. We would need the OffsetIndex file to help us to quickly narrow down the search. There are also two options to expose the offset index to the consumer with the second option as the preference:
- When the leader broker respond FetchResponse back to the consumer client, it can also add the location of OffsetIndex file (which is stored on remote tiered storage) and have client download the index file and do the necessary index search;
- The broker will download and read the OffsetIndex file and sets the corresponding begin/end byte position in the file back to the consumer client; This way the client only needs to download the data log segment within a small range marked by begin/end file position; Since the OffsetIndex file is small comparing to the log segment file and this approach can simplify the consumer side logic, we prefer this option;
Version compatibility check with broker
Over the time the broker might evolve the storage format of the log files which makes the earlier version of the client not able to read the log file anymore. To protect this kind of version incompatibility issue, we can introduce another field in FetchRequest: SupportedStorageFormatVersions, the consumer will fill in this field with the server side storage format versions it understands (we can mark the current storage format as ApachKafkaV1), the server can fallback to the old behavior if it thinks the client version is too low. And we mark this field as list type to contain a list of versions the client supports, so in the case when the client is newer while the server is older (forward-compatibility case), the server can still respond with the new behavior if server’s old version is still in the list of versions client understands.
This new field in FetchRequest can also handle the case that some Kafka-API-compabile vendors (e.g. Apache Pulsar, WarpStreams, AutoMQ) use their own proprietary storage format.
Public Interfaces
Protocol Changes
We will extend the FetchRequest API by adding a new field RemoteLogSegmentLocationRequested to indicate the client has the capability to read from remote storage and is interested in getting the location of remote log segment file on remote storage.
{
"validVersions": "0-18",
"fields": [
{ "name": "RackId", ... }
{ "name": "RemoteLogSegmentLocationRequested", "type": "bool", "versions": "18+", "default": "false", "ignorable": true,
"about": "Indicate whether the client is able to read from remote storage and interested in getting the location of remote log segment file"}
]
}
We will extend the FetchResponse API in order to convey the location of the remote log segment file to the consumer client.
{
"validVersions": "0-18",
"fields": [
{ "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
{ "name": "PreferredReadReplica", ... },
{ "name": "RemoteLogSegmentId", "type": "uuid", "versions": "18+", "ignorable": true,
"about": "The uuid of the remote log segment id"},
{ "name": "RemoteLogSegmentCustomMetadata", "type": "bytes", "versions": "18+", "ignorable": true,
"about": "The byte[] of the custom metadata of the remote log segment metadata"},
]}
]},
]
}
Two new fields are added to FetchResponse.Partitions message structure:
- RemoteLogSegmentId: this is the uuid field in https://github.com/apache/kafka/blob/4.2/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentId.java#L32
- RemoteLogSegmentCustomMeta: this is the byte[] content in the optional CustomMetadata class in https://github.com/apache/kafka/blob/4.2/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java#L384, some implementations of RemoteStorageManager use this class to store the file path location of the remote log segment file.
These two fields should be sufficient to resolve the physical file path on remote tiered storage.
Compatibility, Deprecation, and Migration Plan
The change is backwards compatible with previous versions. The new parameter RemoteLogSegmentLocationRequested in consumer FetchRequest is false by default to not to change the FetchResponse behavior. And If the receiving broker does not support this feature, it will resort to the old behavior of fetching old messages from remote tiered storage and sending them back to the consumer client.
Rejected Alternatives
- Having the consumer subscribes the tiered storage metadata topic: __remote_log_metadata, filters to the partition it interests in, parses each metadata message and tracks the state change; We don’t recommend this approach because:
- __remote_log_metaddata topic is a congested topic in tiered storage which contains metadata for all the tiered storage enabled topic partitions and each metadata needs to go through several state changes. The topic includes the metadata history of all the remote log segment from day 1 even if the segment file itself is already removed from tiered storage; There are lot of reading/filtering and caching needs to happen on the consumer side;
- With many programming languages to support on the consumer side and given the trend of moving complex consumer logic to the broker side, we feel it is more natural to have the broker to keep track of the remote tiered storage segment file and exchange the info with the consumer. This is similar to the design choice (whether consumer or the broker keep tracks of preferred read replica) during KIP-392: Allow consumers to fetch from closest replica design where it also favors to have the broker exchange the preferred read replica information with the consumer.
- Launch new brokers to serve the large backfill read use cases. When the consumer asks the leader broker to serve FetchRequest, the leader broker will respond with those new brokers' ids in FetchResponse so the consumer will subsequently asks those new brokers to serve FetchRequest. This is very similar to how leader broker responded with preferredReadReplica in KIP-392. Those new brokers can still download log segments from remote storage before they can serve FetchRequest. In this approach, the network latency/cost is not avoided, but we can keep the current running brokers not being affected by reads from large backfill consumers. This approach has minimal code changes needed on the consumer side, but it has the following drawbacks:
- In order not to affect the performance of the current running brokers, we need to group the brokers into different pools (a pool to serve the normal producing/consumer use cases, a pool to serve the backfill or other maintenance tasks), this is a new concept for Kafka community;
- The user needs to manually start/stop those new backfill-dedicated brokers before/after backfill task;
- In order to quickly bootstrap those new brokers to ready-to-serve state, we need KIP-1023: Follower Fetch from tiered offset to be implemented first. With KIP-1023, the new brokers only needs to replicate a few minutes of data from leader broker to be considered in-sync (the rest of the data can be downloaded later lazily from tiered storage);
- Network cost/latency related to 2-hop data transfer (from remote storage to broker, from broker to consumer) is still there.