Status
Current state: ["Under Discussion"]
Discussion thread: here
JIRA: KAFKA-15777
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
When remote storage is enabled on a topic, and the consumer is doing backfill, then the data is read from the remote storage. Currently, Kafka remote storage supports reading data only from the first remote partition in the given FETCH request (see KAFKA-14915). The default value of max.partition.fetch.bytes
is configured to 1 MB. Reading 1 MB of data per FETCH request from remote, increases the round-trip time for the consumer and the cloud provider connectors are often tuned to read data in chunks of 4 MB.
The proposal is to introduce remote.max.partition.fetch.bytes
in the Consumer config which allows the user to tune the value depending on their storage plugin. The user might want to optimize the number of calls to remote storage vs the amount of bytes returned back to the client in the FETCH response.
Public Interfaces
- Introduce new
remote.max.partition.fetch.bytes
consumer config. If not configured, fallback to usemax.partition.fetch.bytes
value. - Bump FetchRequest to v18 to add the
RemotePartitionMaxBytes
field to propagate the value configured on the consumer to the broker.
Proposed Changes
The proposal is to introduce a new Consumer config: remote.max.partition.fetch.bytes
to configure the number of bytes returned from remote storage in a FETCH request. The reason for not re-using the existing the max.partition.fetch.bytes
is that:
- The default value of
fetch.max.bytes
is set to 50 MB andmax.partition.fetch.bytes
is 1 MB. - If the user increases the
max.partition.fetch.bytes
value to 4 MB, then it applies for all the partitions in the FETCH requests. (ie) Reading the data from local storage too. - Assume that the consumer is reading data from a topic with 64 partitions and 16 partition leaders are co-located in a single broker.
- Broker allocates 12 instances of 4 MB buffers (12 x 4 = 50 MB) which might trigger the Young Gen and Old Gen GC when there are many FETCH requests from different consumers.
- Broker does not use BufferPool to allocate the buffers.
- Allocating big byte-buffers have direct impact on GCs.
- Remote read gets triggered for only one partition in a given FETCH request. And, we want to allocate the big byte-buffer (4 MB) only for remote read requests.
- The assumption made is that there will be fewer RemoteRead requests compared to the LocalRead requests. Kafka performs faster when reading the data from PageCache instead of disk.
- This will also simplify the RemoteStorageManager plugin implementation.
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true, "about": "The name of the topic to fetch." }, { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true, "about": "The current leader epoch of the partition." }, { "name": "FetchOffset", "type": "int64", "versions": "0+", "about": "The message offset." }, { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", "default": "-1", "ignorable": false, "about": "The epoch of the last fetched record or -1 if there is none."}, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The earliest available offset of the follower replica. The field is only used when the request is sent by the follower."}, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." }, { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true, "about": "The directory id of the follower fetching." }, // New field added to FetchTopic { "name": "RemotePartitionMaxBytes", "type": "int32", "versions": "18+", "ignorable": true, "about": "The maximum bytes to fetch from this partition from remote storage. See KIP-74 for cases where this limit may not be honored." } ]}]},
If the remote.max.partition.fetch.bytes
is not configured / FETCH request is from the old client, then it fallbacks to the use the max.partition.fetch.bytes
config.
Compatibility, Deprecation, and Migration Plan
The change is fully compatible with the existing code. There won't be any impact to the existing users.
Test Plan
The patch will be covered with unit test and integration tests.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.