DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted, target 4.2.0
Discussion thread: https://lists.apache.org/thread/9fn7n7n3wys9y6olmcz5p56q9n3tp45v
vote thread: https://lists.apache.org/thread/hv6b73moxyn47tlw08hw2cn0vdvhj7tv
JIRA: https://issues.apache.org/jira/browse/KAFKA-19020
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In shared fetch operations, the maxFetchRecords parameter currently functions as a soft limit, meaning the actual number of records returned to the client may exceed this threshold. This is because records are read and packaged into ShareFetchResponse in batches, and the actual number of messages encapsulated in ShareFetchResponse might exceed maxFetchRecords hence currently the last acquired offset always the batches last offset.
The current implementation offers certain benefits, but it also has drawbacks:
Pros:
- Records are consistently delivered in batches, and no records that haven't been acquired will be sent to clients. This ensures there is no extra overhead incurred on either the client or server side. If the client aims to achieve the maximum throughput while having no strict expected limit on the number of returned records, the current implementation is acceptable.
Cons:
- Unpredictable Record Counts:
- In scenarios where record processing is time-consuming, consumers would like to strictly limit the number of records (even down to single-record delivery) to reduce lock timeouts and redelivery.
- By adjusting the max.poll.records parameter, clients can directly control consumer utilization (the proportion of time consumers spend processing messages vs. idling). In the method I propose below, record_limit mode, the number of records returned to the consumer in each fetch will never exceed max.poll.records, enabling predictable throughput. For instance, if there are 500 messages in the queue and no consumers are running when the messages enter the queue, the first consumer to start will only fetch up to max.poll.records messages in a single request, rather than all 500 at once.
Public Interfaces
Request Schema
{
"apiKey": 78,
"type": "request",
"listeners": ["broker"],
"name": "ShareFetchRequest",
// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apacke Kafka 4.1.
//
// Version 1 is the initial stable version (KIP-932).
+ // Version 2 adds ShareAcquireMode as described in KIP-1206
+ "validVersions": "1-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
"about": "The group identifier." },
{ "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The member ID." },
{ "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
"about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
{ "name": "MaxWaitMs", "type": "int32", "versions": "0+",
"about": "The maximum time in milliseconds to wait for the response." },
{ "name": "MinBytes", "type": "int32", "versions": "0+",
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
{ "name": "MaxRecords", "type": "int32", "versions": "1+",
"about": "The maximum number of records to fetch. This limit can be exceeded for alignment of batch boundaries." },
{ "name": "BatchSize", "type": "int32", "versions": "1+",
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
+ { "name": "ShareAcquireMode", "type": "int8", "versions": "2+",
+ "about": "The acquire mode to control the fetch behavior: 0 - batch-optimized, 1 - record-limit" },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID.", "mapKey": true },
{ "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
"about": "The partitions to fetch.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0",
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "First offset of batch of records to acknowledge."},
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "Last offset (inclusive) of batch of records to acknowledge."},
{ "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
"about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
]}
]}
]},
{ "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+",
"about": "The partitions to remove from this share session.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions indexes to forget." }
]}
]
}
ShareFetch
Add an ShareAcquireMode field to the ShareFetch class.
ShareAcquireMode
A new class org.apache.kafka.clients.consumer.internals.ShareAcquireMode is added.
public enum ShareAcquireMode {
BATCH_OPTIMIZED("batch_optimized"),
RECORD_LIMIT("record_limit");
public final String name;
ShareAcquireMode(final String name) {
this.name = name;
}
/**
* Case-insensitive acquire mode lookup by string name.
*/
public static ShareAcquireMode of(final String name) {
return ShareAcquireMode.valueOf(name.toUpperCase(Locale.ROOT));
}
}
Client API changes
Consumer configuration
| Configuration | Description | VALUES |
|---|---|---|
| share.acquire.mode | Controls how a shared consumer obtains records, The behavior of the max.poll.records client configuration will change depending on the selected acquire mode. If set to record_limit, the number of records returned in each poll() will not exceed the value of max.poll.records. | Type: String Default: "batch_optimized" Optional values: "batch_optimized" or "record_limit" |
Proposed Changes
Currently, the maxFetchRecords in ShareFetchRequest(corresponds to the max.poll.records parameter provided by the client) acts as a soft limit for controlling the number of records sent back to the consumer. This proposal introduces an ShareAcquireMode in the ShareFetchRequest. When "record_limit" mode is selected, we should only acquire records till maxFetchRecords and for "batch_optimized" mode we can go up to batch boundaries for optimization.
Record-limit and Batch-Optimized mode example
An example of the above two modes look like this:
batch_optimized:
- The records produced onto a topic-partition each contain 5 records. Then the batches would have offsets 0-4 inclusive, 5-9 inclusive, 10-14 inclusive
- a share consumer with `max.poll.records=6, batch.size=2` fetching records from the topic
- Two whole batches of records are acquired, from offsets 0 to 5 inclusive, which is 10 records in total.
- Records acquired: ShareAcquireRecord[0] (offsets 0 to 4) and ShareAcquireRecord[1] (offsets 5 to 9).
record_limit:
- The records produced onto a topic-partition each contain 5 records. Then the batches would have offsets 0-4 inclusive, 5-9 inclusive, 10-14 inclusive
- A share consumer with `max.poll.records=6, batch.size=2` fetching records from the topic
- Return two entire batches 0-9 inclusive, but only acquire records 0-5 inclusive. In this case, the broker has returned record data which has not been acquired by this consumer(offset 6 to 9).
- Records acquired: ShareAcquireRecord[0](offsets 0 to 5).
In record_limit mode, records that have not been acquired will be sent to clients, resulting in overhead on either the client or server side. Therefore, this mode should be used in the following scenarios:
- There is a need to limit the number of records acquired.
- To avoid overhead, the producer's batch size configuration should be kept small.
Batching
Given that our primary objective for the change is to address cases with high processing times, so if "record_limit" mode is enabled, The ShareFetchResponse will contain only one batch, with maximum records number upper bounded by max(BatchSize, MaxRecords).
Compatibility, Deprecation, and Migration Plan
This change is designed to be fully backward compatible:
- The default value of the newly introduced configuration is set to "batch_optimized", enabling compatibility with older versions of the broker.
Test Plan
Since this is a fairly small change, unit tests should be sufficient.
Rejected Alternatives
- Control the records number at log level to avoid additional overhead: When reading records from the log, at least one batch will be read, and this batch may be compressed. If we need to control the number of records at the log level, decompression will be required, which will result in additional CPU consumption. Therefore, it makes sense to avoid CPU consumption at the expense of additional network transmission overhead.
- Adjusting the record batching so that a single batch (e.g., records 0-4 inclusive) is returned in record limit mode is straightforward, and the code assumes batch boundary alignment for other operations as well. As a result, this approach is not preferred.

