DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Draft"
JIRA: KAFKA-19554 - Getting issue details... STATUS
1. Motivation
Currently, Kafka consumers can only limit the volume of messages fetched by size (e.g., fetch.max.bytes, max.partition.fetch.bytes). However, there is no way to limit the number of messages returned in a fetch request, either globally or per partition.
As Kafka evolves beyond a traditional message queue into a broader streaming platform and data pipeline backbone, many businesses are encountering rate-limiting requirements based on message count. These requirements are especially common in downstream services where message processing capacity is more naturally expressed in "messages per second" rather than raw bytes.
To work around this limitation, consumers often fetch large batches, truncate them manually, and commit offsets selectively. This approach is:
Inefficient (fetches and discards unused data)
Adds latency
Reduces throughput
Increases offset management complexity
In fact, many capable companies have already internally modified Kafka to support this feature, which further highlights the practical need for a unified, officially supported solution.
2. Public Interfaces
2.1 New Fetch Request Field
We propose adding the following optional field to the FetchRequest protocol:
int32 MaxRecords (optional,Refer to the MaxRecords field in ShareFetchRequest,ShareFetchRequest)
This field represents the maximum number of messages to return per fetch request
2.2 Client Configuration
A corresponding consumer config will be added:
properties
fetch.max.message.count
Type: int
Description: If greater than 0, the consumer will request no more than this number of messages per fetch. The broker will make a best-effort attempt to respect this limit.
3. Proposed Changes
3.1 Broker Changes
The broker will monitor the number of messages added to each fetch response.
Once
max_message_countis reached (globally across partitions or per partition), the fetch assembly will stop.This logic must be integrated with the existing byte-based limits (e.g., stop if either the byte limit or message count is reached).
Due to batching and compression, exact counts may not be achievable without decompressing entire batches. As such, this implementation will adopt a best-effort approach:
If compressed, a batch will be counted as one "message" unless decompression is trivial (e.g., for uncompressed or simple formats).
For greater precision, clients can disable compression or configure predictable batch sizes.
3.2 Client Changes
The client will be updated to allow setting
fetch.max.message.count.This value will be sent in the
FetchRequest.The client logic for polling and offset management remains unchanged.
4. Compatibility, Deprecation, and Migration Plan
This feature is purely additive and fully backward-compatible:
Brokers that do not understand the new field will ignore it.
Clients will fall back to current behavior if the broker does not support the field.
There are no deprecated configurations or breaking changes.
This feature can be enabled or disabled by simply setting or omitting the new parameter.
5. Test Plan
Unit tests will cover:
Proper enforcement of message count limits.
Compatibility with existing byte-based limits.
Scenarios with compression enabled and disabled.
Integration tests will simulate fetches under real workloads to ensure throughput and correctness.
Performance benchmarks will ensure no regression in fetch performance when
fetch.max.message.countis not set.
6. Rejected Alternatives
Enforcing message count limit on the client only: Already possible, but inefficient.
Enforcing message count precisely with full decompression: Potentially too expensive; the best-effort approach provides a practical compromise.
Relationship to ShareGroups / Queues for Kafka
The ShareGroups model (also known as the Queues for Kafka model) does introduce a MaxRecords field to control the number of records returned in a fetch response. However, this mechanism is limited to the internal protocol used within ShareGroups. The standard Kafka Consumer API (i.e., the traditional consumer model) does not support ShareFetchRequest or the MaxRecords field.
Therefore, the goal of this KIP is to formally support a similar capability in the standard FetchRequest protocol, enabling all users to directly control the number of records returned per fetch.