Status

Current state: "Under Discussion"

Discussion thread: here

JIRA:<TBD>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka producer's current memory allocation strategy reserves the full "batch.size" of pool memory upfront, for each active partition, regardless of how much data the batch ultimately holds before it is sent. With many partitions and a "batch.size" tuned for high throughput on high-latency scenarios, this forces "partition_count * batch.size" of upfront memory reservation.

This pattern is becoming particularly troublesome as Kafka deployments shift toward object-storage-backed architectures. The higher end-to-end produce latency in these deployments requires higher "batch.size" to amortize the per-request cost, increasing the risk of memory failures.

As an example: a producer writing 10 MiB/s of aggregate throughput to a 1000-partition topic with RoundRobin partitioning struggles to achieve a meaningful fraction of that at the default 16384 bytes "batch.size". Each partition only sends 16384 bytes at a time over a high-latency link, so per-partition throughput is bounded by "16384 bytes / RTT". Increasing "batch.size" to 4 MiB unblocks throughput but the producer would need 4 MiB × 1000 partitions = 4 GiB of pool memory to accommodate all partitions simultaneously (regardless of actual volume of data flowing per partition).

Applications are forced to choose between two bad outcomes:

  • Small "batch.size" => poor throughput on high-latency clusters.
  • Large "batch.size" => "buffer.memory" needs to scale with partition count (which can change dynamically), not with actual throughput, sizing memory for partition_count * batch.size (homogenous upfront allocation for all partitions, which is an eager assumption, e.g., some partitions may buffer far less data per linger window) 

This KIP proposes introducing support for a dynamic memory allocation strategy in the KafkaProducer. The strategy is delivered as an opt-in producer configuration, with the current behaviour preserved as the default, allowing safe incremental rollout and rollback capability. 

With the new dynamic strategy enabled, memory utilization scales based on the workload's throughput and "linger.ms", not with the unpredictable active-partition count. With it, applications can afford to safely increase "batch.size" for latency/throughput targets without the per-partition multiplier risk. The strategy initially builds on top of the same existing producer configs for batching behaviour ("batch.size", "linger.ms", "buffer.memory").

Public Interfaces

New producer configuration

  • "buffer.memory.allocation.strategy" type String ->  “static”, “dynamic”
    • controls how the producer's "BufferPool" reserves memory for record batches. 
    • static: current (trunk) behaviour, full batch reserved upfront for all active partitions
    • dynamic: lazy memory allocation per-record that scales with actual data buffered. 
    • static remains the default value

New "BufferPool" metrics

New producer metrics to surface direct-heap usage when the "dynamic" strategy falls back to heap allocation under pool pressure. These metrics report zero when the "static" strategy is in use. 

  • bufferpool-overflow-bytes-rate: The average bytes per second allocated outside the `BufferPool`, computed over the metric sample window. Used to identify buffer.memory pressure (sustained non-zero indicating undersized for the workload), and as guidance to better size buffer.memory (based peak rates and the configured linger time).
  • bufferpool-overflow-bytes-total: Total heap bytes allocated outside the "BufferPool" since the producer started. Pairs with the previous "-rate" metric adding visibility over absolute-values for alerts and rate computations over arbitrary windows.

Proposed Changes

Internal changes to the producer to support the new dynamic memory allocation strategy based on lazy memory allocation with linked buffers. 

  • New strategy enabled if the producer sets the new config "buffer.memory.allocation.strategy=dynamic"
  • If enabled, instead of reserving the full "batch.size" upfront when a batch is created, it will allocate fixed size chunks as needed, keeping them linked together to back the batch, and returned to the pool when the batch completes.
  • On send, the list of chunks/buffers is written to the network via scatter-gather, avoiding the extra copy that flattening into a single buffer would require. Wire bytes are byte-for-byte equivalent so what the broker receives remains unchanged. 
  • The new dynamic strategy is implemented via a new chunked-buffer type, sibling of the existing "ByteBufferOutputStream" (current single-buffer trunk implementation)
  • The new config determines which buffer type is used for the producer, allowing the two allocation strategies to coexist in the codebase. 
  • Customers using the default "static" strategy continue to use the existing single-buffer path unchanged; customers on "dynamic" use the chunked buffer.

Internal allocation strategy

When a record arrives to be appended to a batch, the producer will allocate all chunks predicted to be needed for the record, in a single pool call. This will also consider any unfilled capacity from chunks reserved by previous records. 

  • Similar blocking behaviour and error as trunk: first record for a batch blocks for memory for "max.block.ms" and may throw BufferExhaustedException. Following records appended to the batch do not block or throw. They attempt non-blocking pool allocation and fall back to direct heap if the pool is exhausted. 
    • Ensures not blocking on pool memory while already holding some for a batch 
    • Whenever direct allocation is needed, the buffer is marked exhausted so no more records added. 
    • This approach may incur in more direct allocations under memory pressure compared to trunk, so metrics added to have visibility on this and guide on how to adjust "buffer.memory" to avoid it. 
  • The number of chunks allocated is calculated based on the record's serialized size: exact for uncompressed records, and an estimation based on compression ratio if compression is used. Compression underestimation is handled by the mid-record growth path described below.
  • All chunks have the same fixed size, hardcoded at 16384 bytes (same value as today's default "batch.size", but decoupled from it. Increasing batch.size does not change the chunk size). 
  • The chunk size is not exposed as a producer config in this KIP. It is kept as an internal grouping unit used to build the batch, using the already well defined default value of 16384 bytes. The "batch.size" remains the user-facing batching size config.

Mid-record growth

The number of chunks initially reserved for a record could end up being insufficient when compression used (e.g., compression underestimation, extra bytes emitted when the compressor's internal buffers drain). In such cases, additional chunks are acquired as the record is being written. This mid-record growth path is never required when producing uncompressed data.

Each growth attempt tries a non-blocking pool allocation first. If it fails, it falls back to direct allocation to ensure that the write operation can complete, and it marks the batch as exhausted (so it gets closed and ready to send).

Given the initial allocation already reserves the number of chunks estimated to be needed, growth events are bounded (improving as the compression ratio improves, and bounded by the buffer sizes of each compressor type)

Compatibility, Deprecation, and Migration Plan

No impact on existing users. The default value of "buffer.memory.allocation.strategy" is "static" , which preserves the current behaviour unchanged.

Migrating to the new strategy only requires a client configuration change to set "buffer.memory.allocation.strategy=dynamic"

Out of scope for this KIP: once the new dynamic strategy has been sufficiently validated, we could consider making it default. At that point we could also re-think and increase the default "batch.size" currently set at 16384 bytes. The dynamic strategy would allow to afford larger batches, so we could considering relaxing the batch.size default value and let applications accumulate more data focusing mainly on tuning the linger time.

Test Plan

  • All existing producer tests must pass unchanged, with "buffer.memory.allocation.strategy = static" (the default), guaranteeing applications who don't opt in are not affected.
  • New unit and integration tests covering the "dynamic" path.
  • Extensive performance benchmarks across diverse workloads, considering different partitioning strategies, compression types, client configurations, broker-side related configurations impacting batch splitting, etc.

Rejected Alternatives

Customer-facing configs to control the dynamic chunk-based allocation (similar to KIP-782)

  • This KIP deliberately avoids expanding the public API surface with new configs to control batch sizing. Instead, it relies on the existing producer configs ("batch.size", "linger.ms", "buffer.memory"), just making internal implementation changes to allocate memory more efficiently. 
  • Once this improvement becomes available and gains usage across multiple workload, we can assess if additional tuning flexibility is really needed, and follow-up with separate KIPs. 


  • No labels