Status

Current state["Under Discussion"]

Discussion thread: here

JIRA: KAFKA-15777

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When remote storage is enabled on a topic, and the consumer is doing backfill, then the data is read from the remote storage. Currently, Kafka remote storage supports reading data only from the first remote partition in the given FETCH request (see KAFKA-14915). The default value of max.partition.fetch.bytes  is configured to 1 MB. Reading 1 MB of data per FETCH request from remote, increases the round-trip time for the consumer and the cloud provider connectors are often tuned to read data in chunks of 4 MB. 

The proposal is to introduce remote.max.partition.fetch.bytes  in the Consumer config which allows the user to tune the value depending on their storage plugin. The user might want to optimize the number of calls to remote storage vs the amount of bytes returned back to the client in the FETCH response.

Public Interfaces

  • Introduce new remote.max.partition.fetch.bytes consumer config. If not configured, fallback to use max.partition.fetch.bytes value.
  • Bump FetchRequest to v18 to add the RemotePartitionMaxBytes field to propagate the value configured on the consumer to the broker. 

Proposed Changes

The proposal is to introduce a new Consumer config: remote.max.partition.fetch.bytes to configure the number of bytes returned from remote storage in a FETCH request. The reason for not re-using the existing the max.partition.fetch.bytes is that:

  1. The default value of fetch.max.bytes  is set to 50 MB and max.partition.fetch.bytes is 1 MB.
  2. If the user increases the max.partition.fetch.bytes value to 4 MB, then it applies for all the partitions in the FETCH requests. (ie) Reading the data from local storage too.
  3. Assume that the consumer is reading data from a topic with 64 partitions and 16 partition leaders are co-located in a single broker. 
    1. Broker allocates 12 instances of 4 MB buffers (12 x 4 = 50 MB) which might trigger the Young Gen and Old Gen GC when there are many FETCH requests from different consumers.
    2. Broker does not use BufferPool to allocate the buffers. 
    3. Allocating big byte-buffers have direct impact on GCs.
  4. Remote read gets triggered for only one partition in a given FETCH request. And, we want to allocate the big byte-buffer (4 MB) only for remote read requests.
  5. The assumption made is that there will be fewer RemoteRead requests compared to the LocalRead requests. Kafka performs faster when reading the data from PageCache instead of disk.
  6. This will also simplify the RemoteStorageManager plugin implementation. 


FetchRequest.json

FetchRequest.json
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true,
        "about": "The name of the topic to fetch." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
          "about": "The current leader epoch of the partition." },
        { "name": "FetchOffset", "type": "int64", "versions": "0+",
          "about": "The message offset." },
        { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", "default": "-1", "ignorable": false,
          "about": "The epoch of the last fetched record or -1 if there is none."},
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower."},
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored." },
        { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true,
          "about": "The directory id of the follower fetching." },
        // New field added to FetchTopic
        { "name": "RemotePartitionMaxBytes", "type": "int32", "versions": "18+", "ignorable": true,
          "about": "The maximum bytes to fetch from this partition from remote storage.  See KIP-74 for cases where this limit may not be honored." }
      ]}]},


If the remote.max.partition.fetch.bytes is not configured / FETCH request is from the old client, then it fallbacks to the use the max.partition.fetch.bytes config.

Compatibility, Deprecation, and Migration Plan

The change is fully compatible with the existing code. There won't be any impact to the existing users.

Test Plan

The patch will be covered with unit test and integration tests.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels