Current state: Accepted
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently the only possible way for client to limit fetch response size is via per-partition response limit max_bytes taken from config setting max.partition.fetch.bytes.
So the maximum amount of memory the client can consume is max.partition.fetch.bytes * num_partitions, where num_partitions is the total number of partitions currently being fetched by consumer.
This leads to following problems:
- Since num_partitions can be quite big (several thousands), the memory required for fetch responses can be several GB
- max.partition.fetch.bytes can not be set arbitrarily low since it should be greater than maximum message size for fetch request to work.
- Memory usage is not easily predictable - it depends on consumer lag
This KIP proposes to introduce new version of fetch request with new top-level parameter max_bytes to limit the size of fetch response and solve above problem.
In particular, if consumer issues N parallel fetch requests, the memory consumption will not exceed N * max_bytes.
Actually, it will be min(N * max_bytes, max.partition.fetch.bytes * num_partitions) since per-partition limit is still respected.
This KIP introduces:
- New fetch request (v.3) with response size limit
- New client-side config parameter fetch.max.bytes - client's fetch response size limit
- New replication config parameter replica.fetch.response.max.bytes - limit used by replication thread
- New inter-broker protocol version "0.10.1-IV0" - starting from this version brokers will use fetch request v.3 for replication
Proposed changes are quite straightforward. We introduce FetchRequest v.3 with new top level parameter max_bytes:
Fetch Request (Version: 3) => replica_id max_wait_time min_bytes max_bytes [topics]
Fetch Response v.3 will remain the same as v.2.
Server processes partitions in order they appear in request.
If top level max_bytes parameter is Int.MAX_INT ("no limit"), the request is processed exactly as before.
Otherwise, for each partition except the first one server fetches up to corresponding partition limit max_bytes, but not bigger than remaining response limit.
For the first partition, server always fetches at least one message. Empty response limits will be returned for all partitions that didn't fit into response limit.
This algorithm provides following guarantees:
- FetchRequest with top level max_bytes != Int.MAX_INT always makes progress - if server has message(s), than at least one message is returned irrespective of max_bytes
- FetchRequest response size will not be bigger than max(max_bytes, size of the first message in first partition)
Since new fetch request processes partitions in order and stops fetching data when response limit is hit, client should use some kind of partition shuffling to ensure fairness.
Consider following example - suppose client want to fetch from 4 partitions: A, B, C, D. Suppose that partitions A and B are growing much faster than C and D.
If client is always fetching partitions in order A,B,C,D than it is possible that response limit is hit before any messages were fetched from C and D.
In this scenario client won't get any messages from C and D until it catches up with A and B.
The solution is to reorder partitions in fetch request in round-robin fashion to continue fetching from first empty partition received or to perform random shuffle of partitions before each request.
Round-robin shuffling seems to be more "fair" and predictable so we decided to deploy it at ReplicaFetcherThread and in Consumer Java API.
Compatibility, Deprecation, and Migration Plan
New fetch request is designed to work properly even if top level max_bytes is less than message size. If max_bytes is Int.MAX_INT, new request behaves exactly like old one.
So we can even make this KIP absolutely transparent for users by making setting default for both max.fetch.bytes and replica.fetch.response.max.bytes to Int.MAX_INT.
However, since clients like ReplicaFetcherThread and Java Consumer are ready for new fetch request, we decided to enable following defaults:
fetch.max.bytes = 50MB
replica.fetch.response.max.bytes = 10MB
Some discussed/rejected alternatives:
- Together with addition of global response limit deprecate per-partitions limit. Rejected since per-partition limit can be useful for Kafka streams (see mail list discussion).
- Do random partition shuffling on server side. Pros: ensure fairness without client-side modifications. Cons: non-deterministic behaviour on server side; round-robin can be easily implemented on client side.