Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Draft"

JIRA: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-19554

1. Motivation

Currently, Kafka consumers can only limit the volume of messages fetched by size (e.g., fetch.max.bytes, max.partition.fetch.bytes). However, there is no way to limit the number of messages returned in a fetch request, either globally or per partition.

...

In fact, many capable companies have already internally modified Kafka to support this feature, which further highlights the practical need for a unified, officially supported solution.

...

2. Public Interfaces

2.1 New Fetch Request Field

We propose adding the following optional field to the FetchRequest protocol:

...

This field represents the maximum number of messages to return per fetch request

2.2 Client Configuration

A corresponding consumer config will be added:

...

  • Type: int

  • Description: If greater than 0, the consumer will request no more than this number of messages per fetch. The broker will make a best-effort attempt to respect this limit.

...

3. Proposed Changes

3.1 Broker Changes

  • The broker will monitor the number of messages added to each fetch response.

  • Once max_message_count is reached (globally across partitions or per partition), the fetch assembly will stop.

  • This logic must be integrated with the existing byte-based limits (e.g., stop if either the byte limit or message count is reached).

...

  • If compressed, a batch will be counted as one "message" unless decompression is trivial (e.g., for uncompressed or simple formats).

  • For greater precision, clients can disable compression or configure predictable batch sizes.

3.2 Client Changes

  • The client will be updated to allow setting fetch.max.message.count.

  • This value will be sent in the FetchRequest.

  • The client logic for polling and offset management remains unchanged.

...

4. Compatibility, Deprecation, and Migration Plan

  • This feature is purely additive and fully backward-compatible:

    • Brokers that do not understand the new field will ignore it.

    • Clients will fall back to current behavior if the broker does not support the field.

  • There are no deprecated configurations or breaking changes.

  • This feature can be enabled or disabled by simply setting or omitting the new parameter.

...

5. Test Plan

  • Unit tests will cover:

    • Proper enforcement of message count limits.

    • Compatibility with existing byte-based limits.

    • Scenarios with compression enabled and disabled.

  • Integration tests will simulate fetches under real workloads to ensure throughput and correctness.

  • Performance benchmarks will ensure no regression in fetch performance when fetch.max.message.count is not set.

...

6. Rejected Alternatives

  • Enforcing message count limit on the client only: Already possible, but inefficient.

  • Enforcing message count precisely with full decompression: Potentially too expensive; the best-effort approach provides a practical compromise.

...