DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: "Accepted"
Discussion thread: here
Vote thread: here
JIRA: KAFKA-19519
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The current group coordinator implementation uses max message size to determine the maximum buffer size for the coordinator append cache. This causes some issues:
- Operational fragility: Large max message size values (common in event streaming pipelines) create artificially high memory pressure on the coordinator, risking OutOfMemoryError.
- Unnecessary coupling: the coordinator buffers should be independent from producer message limits.
While recent improvements (KAFKA-19427) implement dynamic buffer growth starting from 512KB, the upper bound remains tied to message.max.bytes. This KIP proposes full decoupling.
Public Interfaces
This kip will introduce two new configs: group.coordinator.append.max.buffer.size in GroupCoordinatorConfig, and share.coordinator.append.max.buffer.size in ShareCoordinatorConfig, both of which will be dynamically updatable.
| Name | Type | Importance | Default | Valid values | Description |
|---|---|---|---|---|---|
| group.coordinator.append.max.buffer.size | Int | MEDIUM | 1 * 1024 * 1024 + Records.LOG_OVERHEAD | > 512 * 1024 | The largest buffer size allowed by GroupCoordinator (It is recommended not to exceed the maximum allowed message size). |
| share.coordinator.append.max.buffer.size | Int | MEDIUM | 1 * 1024 * 1024 + Records.LOG_OVERHEAD | > 512 * 1024 | The largest buffer size allowed by ShareCoordinator (It is recommended not to exceed the maximum allowed message size). |
Extend the CoordinatorRuntime Builder interface to allow different coordinator implementations to supply their buffer size config.
public Builder<S, U> withMaxBufferSize(Supplier<Integer> maxBufferSizeSupplier) {
this.maxBufferSizeSupplier = maxBufferSizeSupplier;
return this;
}
Introduce two metrics to enhance the observability of the coordinator's buffer cache.
MBean | Description |
| kafka.server:type=group-coordinator-metrics, name=coordinator-append-buffer-size-bytes, protocol=(consumer|share) | The current total size in bytes of the append buffers being held in the coordinator's cache. |
| kafka.server:type=group-coordinator-metrics, name=coordinator-append-buffer-skip-cache-count, protocol=(consumer|share) | The count of oversized append buffers that were discarded instead of being cached upon release. |
Proposed Changes
The proposal introduces two new configurations to control the maximum append buffer size specifically for the CoordinatorRuntime, allowing independent sizing from max message size limits.
Compatibility, Deprecation, and Migration Plan
The max buffer of the new coordinator is currently transparent to users, and this config is a soft limit. If a record exceeds the
append.max.buffer.sizebut is smaller than themessage.max.bytes,s, it will still be written successfully; however, the buffer will not be cached. Therefore, this will not affect any existing functionality.
Test Plan
The patch will include both unit and integration tests to ensure full coverage.
Rejected Alternatives
- Valid values < max message size
- It is necessary to ensure that both
max.message.bytesandmessage.max.bytesare not set smaller than max.buffer.size. However, this would not only introduce complexity in the validation logic when modifying configs but also require changes to the valid value definitions of these two configs. - In practice, the purpose of this constraint was to indicate that setting an excessively large buffer size is unnecessary, since records that large won’t actually be produced
- It is necessary to ensure that both