Authors: Thomas Thornton, Henry Cai



Status

Current state: Under Discussion

Depends On: KIP-1248

Discussion thread: [DISCUSS] KIP-1254: Kafka Consumer Support for Remote Tiered Storage Fetch

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-1248 introduces protocol changes to allow brokers to allow consumers to read remote log segments directly, reducing network bandwidth/cost and disk IOPs. 

However, the broker-side protocol changes are only one half of the solution. Consumer clients must be able to:

  1. Resolve the remote storage location
  2. Authenticate against the remote storage
  3. Parse the log segments retrieved from remote storage
  4. Handle transactions and filter out aborted transactions for read_committed consumers

This KIP defines the general consumer client interface, to be used in the respective consumer-client languages.

Proposed Changes

High-level Flow

  1. Consumer sends a FetchRequest with RemoteLogSegmentLocationRequested=true
  2. Broker returns RemoteLogSegmentId and CustomMetadata in FetchResponse
  3. Consumer stores the remote segment location in SubscriptionState
  4. On the next fetch cycle, consumer invokes the configured RemoteStorageFetcher plugin
  5. The plugin fetches data from remote storage (e.g., S3, GCS, Azure Blob)
  6. Records are added to FetchBuffer for application consumption

Parsing

The core record parsing logic exists in the clients module:

Class

Location

Purpose

RemoteLogInputStream

clients/…/common/record/

Iterates RecordBatch from InputStream

MemoryRecords

clients/…/common/record/

Wraps ByteBuffer as usable records

CompletedFetch

clients/…/consumer/internals/

Transaction filtering for read_committed

DefaultRecordBatch

clients/…/common/record/

Batch parsing (v2+ format)

Consumer parsing flow:

  1. Download byte range from remote storage via RemoteStorageFetcher
  2. Use RemoteLogInputStream.nextBatch() to iterate over RecordBatch objects
  3. Copy batches to ByteBuffer and wrap as MemoryRecords
  4. Use existing CompletedFetch logic for transaction filtering

This is the same parsing flow the broker uses internally.

Optimization for partial reads: To avoid downloading entire segments (possibly multiple GBs):

  • The broker includes the byte position hints derived from the OffsetIndex
  • The consumer requests only the byte range needed via startPosition/endPosition

This approach keeps complexity on the broker side and simplifies the consumer.

Consumer Architecture

The Kafka consumer has two runtime modes: AsyncKafkaConsumer (the new threading model with separate application and network threads) and ClassicKafkaConsumer (the legacy single-threaded model). Both modes will support direct remote storage fetches with the same RemoteStorageFetchManager plugin, but the internal flow differs slightly. We describe each below.

AsyncKafkaConsumer Path

Within the network thread:

  1. FetchRequestManager reads SubscriptionState which now includes RemoteLogSegmentId and RemoteLogSegmentCustomMetadata
  2. FetchRequestsManager constructs UnsentRequest objects with remote segment location information
  3. NetworkClientDelegate routes requests
    1. Standard requests -> NetworkClient -> KafkaBroker
    2. RemoteStorageRequests -> RemoteStorageClient -> RemoteTierStorage
  4. RemoteStorageNetworkClient.send() invokes RemoteStorageFetcher.fetchLogSegment
  5. RemoteStorageNetworkClient.poll() reads the InputStream, converts server log batches to MemoryRecords
  6. FetchRequestManager.handleFetchSuccess creates FetchResponse
  7. FetchRequestManager appends the FetchResponse to FetchBuffer

ClassicKafkaConsumer Path

The legacy path follows similar logic:

  1. Fetcher checks SubscriptionState for remote segment location
  2. Routes to RemoteStorageNetworkClient for remote fetches
  3. Same conversion logic applies via callback to FetchBuffer


Transactions

Raw segments may contain messages from aborted transactions. The handling depends on isolation level:

  • Read_uncommitted: No filtering required - consumer reads all messages
  • Read_committed: Consumer must filter aborted transactions
    • LSO (Last Stable Offset): Present in FetchResponseData.PartitionData. Consumer stops at LSO boundary
    • Aborted Transactions: Broker retrieves aborted transaction info and includes it in FetchResponse.PartitionData. Consumer filters out batches from aborted transactions.

We favor the broker providing transaction info rather than having consumers download transaction index files, as:

  • Transaction aborts are very rare, so the index is typically small
  • Greatly reduces consumer-side complexity
  • Maintains existing transaction filtering semantics

Share Groups (Queues)

Status: Not supported in this KIP.

Share groups require the broker to mediate record distribution via SharePartition.acquire(), which manages complex state including record acquisition, timeouts, and acknowledgments. Direct consumer fetch would require significant architectural changes to maintain this coordination.

This KIP focuses on standard consumer groups (AsyncKafkaConsumer/ClassicKafkaConsumer) for historical data reads. Share groups are typically used for real-time workloads rather than backfilling historical data, making this an edge case.

Fallback

The consumer falls back to broker-mediated fetch when remote fetch fails:

Condition

Behavior

Remote fetch timeout exceeded (fetch.remote.read.timeout.ms exceeded)

Re-issue FetchRequest with RemoteLogSegmentLocationRequested=false

Connection timeout (fetch.remote.connect.timeout.ms exceeded)

Re-issue FetchRequest with RemoteLogSegmentLocationRequested=false

Authentication failure

Re-issue FetchRequest with RemoteLogSegmentLocationRequested=false

RemoteStorageFetcher not configured

Never set RemoteLogSegmentLocationRequested=true

Version Compatibility

To handle storage format evolution, consumers include SupportedStorageFormatVersions in FetchRequest:

  • Consumer populates with storage format versions it can parse (e.g., ApacheKafkaV1)
  • Broker falls back to traditional fetch if client version is incompatible
  • Supports forward compatibility: old server versions in client’s supported list still work

This also enables Kafka-API-compatible vendors with proprietary storage formats to participate (e.g. Apache Pulsar, WarpStreams, AutoMQ)

This design intentionally shifts segment parsing from the broker to the consumer to reduce broker load. While this couples the consumer to the on-disk formatSupportedStorageFormatVersions ensures graceful fallback when formats evolve. The consumer remains decoupled from storage backends via the RemoteStorageFetcher plugin interface.

Authorization

The security model is implementation-specific to each RemoteStorageFetcher plugin. We offer some thoughts on the different approaches, but this entirely depends on specific cloud-provider and is not specified in this KIP:

Presigned URLs (Preferred):

  • Broker generates a temporary pre-signed URL for remote storage object
  • Passed to client via CustomMetadata
  • No long-term credentials needed by consumer
  • Broker ACLs remain authoritative

Local Credentials:

  • Consumer uses its own credential to access remote storage
  • Broker passes unsigned location via CustomMetadata
  • Suitable when consumer have standing permissions

New or Changed Public Interfaces

RemoteStorageFetcher

A pluggable, cloud-provider agnostic interface for fetching data from remote storage:

/**
 * Interface for fetching log segment data directly from remote tiered storage.
 * Implementations are cloud-provider specific (e.g., S3, GCS, Azure Blob)
 */
public interface RemoteStorageFetcher {
    /** Fetch log segment data from remote storage
     * @param remoteLogSegmentMetadata metadata about the remote log segment
     * @param startPosition start byte position in the segment (inclusive)
     * @param endPosition end byte position in the segment (exclusive) or -1 to read to end of segment
     * @return InputStream of the requested log segment data
     * @throws RemoteStorageException if there are errors fetching the segment
     */
    InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, 
                                int startPosition,
                                int endPosition) throws RemoteStorageException;
}

Protocol Changes

FetchRequest (version bump)

Field

Type

Description

RemoteLogSegmentLocationRequested

boolean

Whether consumer requests remote segment location

SupportedStorageFormatVersions

[]string

Storage format versions the consumer can parse

FetchResponse (version bump)

Field

Type

Description

RemoteLogSegmentId

UUID

Unique identifier of the remote segment

RemoteLogSegmentCustomMetadata

bytes

Provider-specific metadata

Note: existing abortedTransactions field will be populated by brokers for remote segments.

Consumer Configs

Config

Type

Default

Description

fetch.remote.enabled

boolean

false

Controls whether consumers set RemoteLogSegmentLocationRequested=true

remote.storage.fetcher.class

string

null

Implementation class for RemoteStorageFetcher interface

remote.storage.fetcher.class.path

string

null

Classpath for loading RemoteStorageFetcher implementation 

fetch.remote.connect.timeout.ms

int

1000

Connection timeout for remote storage. Triggers fallback on timeout

fetch.remote.read.timeout.ms

int

5000

Read timeout for remote storage. Triggers fallback on timeout

New Metrics

New consumer metrics will be added following the existing kafka.consumer:type=consumer-fetch-manager-metrics naming convention including:

  • Remote fetch request rate & totals
  • Remote fetch bytes rate & totals
  • Remote fetch latency (avg/max)
  • Remote fetch errors and fallbacks

Detailed metric definitions will be finalized during implementation.

Class Changes Summary

Class

Change Type

Description

RemoteStorageFetcher

New interface

Cloud-agnostic interface for fetching from remote storage

RemoteStorageNetworkClient

New class

Routes requests to remote storage, converts responses

SubscriptionState

Modified

Add RemoteLogSegmentId, RemoteLogSegmentCustomMetadata fields

UnsentRequest

Modified

Add remote segment location fields

FetchCollector

Modified

Extract remote segment location from FetchResponse

Migration Plan & Compatibility

Client-Broker Compatibility

Scenario

Behavior

New brokers, old consumers

Consumer never sets RemoteLogSegmentLocationRequested=true

Old brokers, new consumers

Broker ignore the field, behaves as before

Format mismatch

Broker checks SupportedStorageFormatVersions, falls back if incompatible

Rollout Steps

  1. Upgrade brokers to support KIP-1248
  2. Deploy RemoteStorageFetcher implementation
  3. Upgrade consumers with new client library
  4. Enable fetch.remote.enabled=true per consumer group
  5. Monitor remote-fetch-* metrics for issues

Downgrade

Disabling fetch.remote.enabled immediately returns to typical broker fetches. No data loss or protocol issues occur.

Rejected Alternatives

Alternative

Reason for Rejection

Broker-side filtering

Defeats the purpose of reducing broker load

Reuse RemoteStorageManager

Too bulky for read-only client use, violates interface segregation

Consumer downloads index files

Adds complexity; broker can provide needed info more efficiently

  • No labels