Status

Current state: "Accepted"

Discussion thread: here 

Vote thread: here 

JIRA: KAFKA-19519

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current group coordinator implementation uses max message size to determine the maximum buffer size for the coordinator append cache. This causes some issues:

  1. Operational fragility: Large max message size values (common in event streaming pipelines) create artificially high memory pressure on the coordinator, risking OutOfMemoryError.
  2. Unnecessary coupling: the coordinator buffers should be independent from producer message limits.

While recent improvements (KAFKA-19427) implement dynamic buffer growth starting from 512KB, the upper bound remains tied to message.max.bytes. This KIP proposes full decoupling.

Public Interfaces

This kip will introduce two new configs: group.coordinator.append.max.buffer.size in GroupCoordinatorConfig, and share.coordinator.append.max.buffer.size in ShareCoordinatorConfig, both of which will be dynamically updatable.

NameTypeImportanceDefaultValid valuesDescription
group.coordinator.append.max.buffer.sizeIntMEDIUM1 * 1024 * 1024 + Records.LOG_OVERHEAD> 512 * 1024The largest buffer size allowed by GroupCoordinator (It is recommended not to exceed the maximum allowed message size).
share.coordinator.append.max.buffer.sizeIntMEDIUM1 * 1024 * 1024 + Records.LOG_OVERHEAD> 512 * 1024The largest buffer size allowed by ShareCoordinator (It is recommended not to exceed the maximum allowed message size).


Extend the CoordinatorRuntime Builder interface to allow different coordinator implementations to supply their buffer size config.

    public Builder<S, U> withMaxBufferSize(Supplier<Integer> maxBufferSizeSupplier) {
        this.maxBufferSizeSupplier = maxBufferSizeSupplier;
        return this;
    }

Introduce two metrics to enhance the observability of the coordinator's buffer cache.

MBean

Description

kafka.server:type=group-coordinator-metrics, name=coordinator-append-buffer-size-bytes, protocol=(consumer|share)

The current total size in bytes of the append buffers being held in the coordinator's cache.

kafka.server:type=group-coordinator-metrics, name=coordinator-append-buffer-skip-cache-count, protocol=(consumer|share)

The count of oversized append buffers that were discarded instead of being cached upon release.

Proposed Changes

The proposal introduces two new configurations  to control the maximum append buffer size specifically for the CoordinatorRuntime, allowing independent sizing from max message size limits.


Compatibility, Deprecation, and Migration Plan

  • The max buffer of the new coordinator is currently transparent to users, and this config is a soft limit. If a record exceeds the append.max.buffer.size but is smaller than the message.max.bytes,s, it will still be written successfully; however, the buffer will not be cached. Therefore, this will not affect any existing functionality.


Test Plan

The patch will include both unit and integration tests to ensure full coverage.

Rejected Alternatives

  • Valid values < max message size
    • It is necessary to ensure that both max.message.bytes  and message.max.bytes  are not set smaller than max.buffer.size. However, this would not only introduce complexity in the validation logic when modifying configs but also require changes to the valid value definitions of these two configs.
    • In practice, the purpose of this constraint was to indicate that setting an excessively large buffer size is unnecessary, since records that large won’t actually be produced


  • No labels