DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Authors: Thomas Thornton, Henry Cai
Status
Current state: Under Discussion
Depends On: KIP-1248
Discussion thread: [DISCUSS] KIP-1254: Kafka Consumer Support for Remote Tiered Storage Fetch
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-1248 introduces protocol changes to allow brokers to allow consumers to read remote log segments directly, reducing network bandwidth/cost and disk IOPs.
However, the broker-side protocol changes are only one half of the solution. Consumer clients must be able to:
- Resolve the remote storage location
- Authenticate against the remote storage
- Parse the log segments retrieved from remote storage
- Handle transactions and filter out aborted transactions for read_committed consumers
This KIP defines the general consumer client interface, to be used in the respective consumer-client languages.
Proposed Changes
High-level Flow
- Consumer sends a FetchRequest with RemoteLogSegmentLocationRequested=true
- Broker returns RemoteLogSegmentId and CustomMetadata in FetchResponse
- Consumer stores the remote segment location in SubscriptionState
- On the next fetch cycle, consumer invokes the configured RemoteStorageFetcher plugin
- The plugin fetches data from remote storage (e.g., S3, GCS, Azure Blob)
- Records are added to FetchBuffer for application consumption
Parsing
The core record parsing logic exists in the clients module:
Class | Location | Purpose |
RemoteLogInputStream | Iterates RecordBatch from InputStream | |
MemoryRecords | Wraps ByteBuffer as usable records | |
CompletedFetch | Transaction filtering for read_committed | |
DefaultRecordBatch | Batch parsing (v2+ format) |
Consumer parsing flow:
- Download byte range from remote storage via RemoteStorageFetcher
- Use RemoteLogInputStream.nextBatch() to iterate over RecordBatch objects
- Copy batches to ByteBuffer and wrap as MemoryRecords
- Use existing CompletedFetch logic for transaction filtering
This is the same parsing flow the broker uses internally.
Optimization for partial reads: To avoid downloading entire segments (possibly multiple GBs):
- The broker includes the byte position hints derived from the OffsetIndex
- The consumer requests only the byte range needed via startPosition/endPosition
This approach keeps complexity on the broker side and simplifies the consumer.
Consumer Architecture
The Kafka consumer has two runtime modes: AsyncKafkaConsumer (the new threading model with separate application and network threads) and ClassicKafkaConsumer (the legacy single-threaded model). Both modes will support direct remote storage fetches with the same RemoteStorageFetchManager plugin, but the internal flow differs slightly. We describe each below.
AsyncKafkaConsumer Path
Within the network thread:
- FetchRequestManager reads SubscriptionState which now includes RemoteLogSegmentId and RemoteLogSegmentCustomMetadata
- FetchRequestsManager constructs UnsentRequest objects with remote segment location information
- NetworkClientDelegate routes requests
- Standard requests -> NetworkClient -> KafkaBroker
- RemoteStorageRequests -> RemoteStorageClient -> RemoteTierStorage
- RemoteStorageNetworkClient.send() invokes RemoteStorageFetcher.fetchLogSegment
- RemoteStorageNetworkClient.poll() reads the InputStream, converts server log batches to MemoryRecords
- FetchRequestManager.handleFetchSuccess creates FetchResponse
- FetchRequestManager appends the FetchResponse to FetchBuffer
ClassicKafkaConsumer Path
The legacy path follows similar logic:
- Fetcher checks SubscriptionState for remote segment location
- Routes to RemoteStorageNetworkClient for remote fetches
- Same conversion logic applies via callback to FetchBuffer
Transactions
Raw segments may contain messages from aborted transactions. The handling depends on isolation level:
- Read_uncommitted: No filtering required - consumer reads all messages
- Read_committed: Consumer must filter aborted transactions
- LSO (Last Stable Offset): Present in FetchResponseData.PartitionData. Consumer stops at LSO boundary
- Aborted Transactions: Broker retrieves aborted transaction info and includes it in FetchResponse.PartitionData. Consumer filters out batches from aborted transactions.
We favor the broker providing transaction info rather than having consumers download transaction index files, as:
- Transaction aborts are very rare, so the index is typically small
- Greatly reduces consumer-side complexity
- Maintains existing transaction filtering semantics
Share Groups (Queues)
Status: Not supported in this KIP.
Share groups require the broker to mediate record distribution via SharePartition.acquire(), which manages complex state including record acquisition, timeouts, and acknowledgments. Direct consumer fetch would require significant architectural changes to maintain this coordination.
This KIP focuses on standard consumer groups (AsyncKafkaConsumer/ClassicKafkaConsumer) for historical data reads. Share groups are typically used for real-time workloads rather than backfilling historical data, making this an edge case.
Fallback
The consumer falls back to broker-mediated fetch when remote fetch fails:
Condition | Behavior |
Remote fetch timeout exceeded (fetch.remote.read.timeout.ms exceeded) | Re-issue FetchRequest with RemoteLogSegmentLocationRequested=false |
Connection timeout (fetch.remote.connect.timeout.ms exceeded) | Re-issue FetchRequest with RemoteLogSegmentLocationRequested=false |
Authentication failure | Re-issue FetchRequest with RemoteLogSegmentLocationRequested=false |
RemoteStorageFetcher not configured | Never set RemoteLogSegmentLocationRequested=true |
Version Compatibility
To handle storage format evolution, consumers include SupportedStorageFormatVersions in FetchRequest:
- Consumer populates with storage format versions it can parse (e.g., ApacheKafkaV1)
- Broker falls back to traditional fetch if client version is incompatible
- Supports forward compatibility: old server versions in client’s supported list still work
This also enables Kafka-API-compatible vendors with proprietary storage formats to participate (e.g. Apache Pulsar, WarpStreams, AutoMQ)
This design intentionally shifts segment parsing from the broker to the consumer to reduce broker load. While this couples the consumer to the on-disk format, SupportedStorageFormatVersions ensures graceful fallback when formats evolve. The consumer remains decoupled from storage backends via the RemoteStorageFetcher plugin interface.
Authorization
The security model is implementation-specific to each RemoteStorageFetcher plugin. We offer some thoughts on the different approaches, but this entirely depends on specific cloud-provider and is not specified in this KIP:
Presigned URLs (Preferred):
- Broker generates a temporary pre-signed URL for remote storage object
- Passed to client via CustomMetadata
- No long-term credentials needed by consumer
- Broker ACLs remain authoritative
Local Credentials:
- Consumer uses its own credential to access remote storage
- Broker passes unsigned location via CustomMetadata
- Suitable when consumer have standing permissions
New or Changed Public Interfaces
RemoteStorageFetcher
A pluggable, cloud-provider agnostic interface for fetching data from remote storage:
/**
* Interface for fetching log segment data directly from remote tiered storage.
* Implementations are cloud-provider specific (e.g., S3, GCS, Azure Blob)
*/
public interface RemoteStorageFetcher {
/** Fetch log segment data from remote storage
* @param remoteLogSegmentMetadata metadata about the remote log segment
* @param startPosition start byte position in the segment (inclusive)
* @param endPosition end byte position in the segment (exclusive) or -1 to read to end of segment
* @return InputStream of the requested log segment data
* @throws RemoteStorageException if there are errors fetching the segment
*/
InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
int startPosition,
int endPosition) throws RemoteStorageException;
}
Protocol Changes
FetchRequest (version bump)
Field | Type | Description |
RemoteLogSegmentLocationRequested | boolean | Whether consumer requests remote segment location |
SupportedStorageFormatVersions | []string | Storage format versions the consumer can parse |
FetchResponse (version bump)
Field | Type | Description |
RemoteLogSegmentId | UUID | Unique identifier of the remote segment |
RemoteLogSegmentCustomMetadata | bytes | Provider-specific metadata |
Note: existing abortedTransactions field will be populated by brokers for remote segments.
Consumer Configs
Config | Type | Default | Description |
fetch.remote.enabled | boolean | false | Controls whether consumers set RemoteLogSegmentLocationRequested=true |
remote.storage.fetcher.class | string | null | Implementation class for RemoteStorageFetcher interface |
remote.storage.fetcher.class.path | string | null | Classpath for loading RemoteStorageFetcher implementation |
int | 1000 | Connection timeout for remote storage. Triggers fallback on timeout | |
int | 5000 | Read timeout for remote storage. Triggers fallback on timeout |
New Metrics
New consumer metrics will be added following the existing kafka.consumer:type=consumer-fetch-manager-metrics naming convention including:
- Remote fetch request rate & totals
- Remote fetch bytes rate & totals
- Remote fetch latency (avg/max)
- Remote fetch errors and fallbacks
Detailed metric definitions will be finalized during implementation.
Class Changes Summary
Class | Change Type | Description |
RemoteStorageFetcher | New interface | Cloud-agnostic interface for fetching from remote storage |
RemoteStorageNetworkClient | New class | Routes requests to remote storage, converts responses |
SubscriptionState | Modified | Add RemoteLogSegmentId, RemoteLogSegmentCustomMetadata fields |
UnsentRequest | Modified | Add remote segment location fields |
FetchCollector | Modified | Extract remote segment location from FetchResponse |
Migration Plan & Compatibility
Client-Broker Compatibility
Scenario | Behavior |
New brokers, old consumers | Consumer never sets RemoteLogSegmentLocationRequested=true |
Old brokers, new consumers | Broker ignore the field, behaves as before |
Format mismatch | Broker checks SupportedStorageFormatVersions, falls back if incompatible |
Rollout Steps
- Upgrade brokers to support KIP-1248
- Deploy RemoteStorageFetcher implementation
- Upgrade consumers with new client library
- Enable fetch.remote.enabled=true per consumer group
- Monitor remote-fetch-* metrics for issues
Downgrade
Disabling fetch.remote.enabled immediately returns to typical broker fetches. No data loss or protocol issues occur.
Rejected Alternatives
Alternative | Reason for Rejection |
Broker-side filtering | Defeats the purpose of reducing broker load |
Reuse RemoteStorageManager | Too bulky for read-only client use, violates interface segregation |
Consumer downloads index files | Adds complexity; broker can provide needed info more efficiently |

