Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: KAFKA-20576 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka producer's current memory allocation strategy reserves at least the full "batch.size" of pool memory for each partition that becomes active (starts a batch), regardless of how much data the batch ultimately holds before it is sent. Pool reservation therefore scales with the active-partition count, homogeneously for all active partitions.

Under the default partitioning logic (partitioner.class unset and sticky behaviour introduced by KIP-480/KIP-794), this is not particularly problematic: records without keys stay on one partition per topic at a time, switching after at least batch.size bytes have been produced to the partition. A pure keyless workload's memory footprint stays bounded regardless of the number of partitions. 

But this allocation strategy becomes problematic under other common scenarios where the active-partition count grows large:

  • Keyed records on many-partition topics: hash(key) % num_partitions may activate all partitions concurrently, depending on the keys. With high-cardinality keys (e.g., per-user, per-account), the active set approaches the number of partitions.
  • Custom partitioners that distribute across partitions (e.g., RoundRobinPartitioner): even keyless records activate all partitions.
  • Mixed workloads in a single producer (e.g., keyless logs to topic A + keyed events to topic B): the keyed topic active partition count drives buffer.memory consumption, even when it may carry a small fraction of the producer's records (pool reservation scales with active-partition count, not data volume).

This situation is becoming particularly troublesome as Kafka deployments shift toward object-storage-backed architectures. The higher end-to-end produce latency in these deployments requires higher "batch.size" to amortize the per-request cost, increasing the risk of memory failures.

Applications are forced to choose between two bad outcomes under the existing strategy:

  • Small "batch.size" => poor throughput on high-latency clusters.
  • Large "batch.size" => "buffer.memory" must be sized for active_partition_count × batch.size, where active_partition_count depends on partition count, partitioning strategy, and observed key distribution. None of these factors are stable across deploys: a workload running fine on keyless records can hit BufferExhaustedException or OOM the moment a code change starts producing keyed records, or as more partitions are added to the topic and the workload activates them.

This KIP proposes a new "incremental" memory allocation strategy in the KafkaProducer, where the memory usage scales based on actual throughput.  

  • per-partition memory ≈ per-partition throughput × linger.ms (capped at batch.size, which closes the batch early when reached),  vs  the existing "full" allocation strategy, based on active_partitions × batch.size 
  • memory usage becomes more predictable (based on workload-independent factors: target throughput and linger) 

Some examples to illustrate the impact on memory usage and memory sizing predictability with the new incremental strategy.

Memory usage: Let's consider an example based on a mixed workload like mentioned above: a single producer sending keyless records to topic A (high volume, e.g. logs), and keyed records to topic B (low volume, e.g., events to 1000-partition topic with default partitioning logic, high cardinality keys). 

At the default batch size (16384 bytes), topic A throughput is really limited: each batch only carries 16384 bytes, so per-partition throughput is bounded by a few batches per round-trip, ~ max.in.flight × 16384 bytes / RTT. Increasing batch.size to 4 MiB would unblock throughput, but under the "full" strategy topic A still reserves ~4 MiB (one sticky-active partition at a time) while topic B activates all 1000 partitions concurrently (hash-distributed keyed records), each reserving 4 MiB regardless of actual volume of data flowing per partition. The producer would need buffer.memory ≥ 4 GiB or the calls to send would hit BufferExhaustedException once topic B's reservation exhausts the pool, even though its data volume is small. 

Under the incremental strategy and the same batch.size = 4 MiB, topic A still gets the full batching benefit (4 MiB batches when traffic justifies it) while topic B's per-partition reservation scales with actual data per-partition per linger window (potentially way under the ~4 GiB used under the "full" allocation given that it's a low-volume topic). Similar savings apply to any workload where per-batch data falls short of batch.size: hot-cold partition distributions (skewed key traffic), bursty workloads with quiet periods, and over-provisioned batch.size settings.

Memory sizing predictability: Let's consider a workload that fully utilizes batch.size per partition: 5 GiB/s aggregate across a 100-partition topic with high-cardinality keys, batch.size = 4 MiB, linger.ms = 100ms. At ~50 MiB/s × 100ms = ~5 MiB per partition per linger window > batch.size, each batch fills before linger fires. Both strategies converge to ~batch.size per active partition: total = 100 × 4 MiB = 400 MiB. Even with comparable memory, the incremental strategy simplifies sizing.

  • "full" requires sizing buffer.memory for max_active_partitions × batch.size, where max_active_partitions depends on partitioner choice and key distribution (can shift at runtime). 
  • "incremental" uses aggregate_throughput × linger.ms, which operators control. During lower-traffic periods, assuming all partitions remain active, "full" still reserves 400 MiB until batches close (even when much lower data volume is actually flowing at that point); reservation (bytes held by open batches) reduces during this low-traffic phase after the high-traffic one (it will reserve incrementally as records arrive. Pool memory previously reserved during the high-traffic period remains allocated just as in the "full" strategy, but the key difference is that it remains as free pool memory that can be used by any partition that needs it, not locked in open batches when not needed). 

The new incremental strategy is delivered as an opt-in producer configuration, with the current behaviour preserved as the default, allowing safe incremental rollout and rollback capability.  It builds on top of the same existing producer configs for batching behaviour ("batch.size", "linger.ms", "buffer.memory").

Public Interfaces

New producer configuration

  • "buffer.memory.allocation.strategy" type String ->  “full”, “incremental”
    • controls how the producer's "BufferPool" reserves memory for record batches. 
    • Supported values: "full" or "incremental" (case-insensitive)
      • "full": current behaviour, at least full batch reserved upfront for all active partitions (more if the record is larger)
      • "incremental": lazy memory allocation based on linked buffers.
        • allocate what's estimated needed per-record , grow as needed
        • memory consumption scales with actual data buffered (per-partition memory ≈ per-partition throughput × linger.ms, capped at batch.size)
        • buffer.memory sizing becomes more predictable, based on workload-independent factors (target throughput and linger time) rather than the active-partition count.  
        • a single record larger than batch.size still forms its own batch, same as under the "full" strategy today.
    • "full" remains the default value

Proposed Changes

Internal changes to the producer to support the new incremental memory allocation strategy based on lazy memory allocation with linked buffers. 

  • New strategy enabled if the producer sets the new config "buffer.memory.allocation.strategy=incremental"
  • If enabled, instead of reserving the full "batch.size" upfront when a batch is created, it will allocate the chunks estimated needed for the record, keeping them linked together to back the batch, and returned to the pool when the batch completes.
  • On send, the list of chunks/buffers is written to the network via scatter-gather, avoiding the extra copy that flattening into a single buffer would require. Wire bytes are byte-for-byte equivalent so what the broker receives remains unchanged. 
  • The new incremental strategy is implemented via a new chunked-buffer type, sibling of the existing single-buffer used in the current implementation
  • The new config determines which buffer type is used for the producer, allowing the two allocation strategies to coexist in the codebase. 
  • Customers using the default "full" strategy continue to use the existing single-buffer path unchanged; customers on "incremental" use the chunked buffer.
  • Partitioning behaviour remains unchanged. The new "incremental" strategy changes how memory backing a batch is reserved, not how records are assigned or when batches are considered full.

Internal allocation strategy

When a record arrives, the producer will allocate all chunks estimated to be needed for the record, in a single pool call. 

  • The number of chunks needed for a record is calculated based on the batch's projected size including the new record, reusing the producer's existing ratio-aware estimation (MemoryRecordsBuilder#estimatedBytesWritten): 
    • projected batch size = batch header + (uncompressed bytes written so far + new record's uncompressed upper-bound size) × estimated compression ratio × safety factor (ratio and safety factor applied only when compression is used)
      • This mirrors the existing hasRoomFor check that decides whether a batch can accept another record, but intentionally differs in the size used for the new record:
        • hasRoomFor uses the new record's uncompressed size (pessimistic). It determines how many records go into a batch. Under-estimating the record here would over-pack the batch and risk exceeding the broker's max.message.bytes and reject/split flow (expensive). This logic remains unchanged, used by both strategies.
        • chunk sizing uses the new record's compressed size (optimistic). It only reserves memory and never changes a batch's content (so it cannot trigger reject/split). If it under-estimates, the growth fallback covers it (pool first, heap if exhausted). Using the expected compressed size keeps reserved memory closer to the data actually buffered.
    • Allocate only the chunks needed to cover the gap between that estimation and the chunk capacity the batch already holds.
    • Exact for uncompressed records (no ratio or safety factor applied), with a few bytes of headroom that guarantees the reservation never under-provisions (same behaviour as in the existing "full" strategy). With this, mid-record growth is never needed for uncompressed data.
    • For compressed data it reserves memory proportional to the bytes the batch is expected to emit after compression
    • Chunks are returned when the batch completes (same lifecycle as the single buffer under the "full" strategy today). Chunks that received no data by the time the batch closes (e.g., reservation overshoot when data compresses better than estimated) can be returned to the pool sooner, at batch close.
  • If multiple chunks needed, they are acquired in a single pool call with the same waiter semantics the BufferPool has today for allocations larger than the poolable size: the request joins the FIFO waiters queue as one unit, progressively reserves memory as it is freed, and either completes with all its chunks or returns everything it had reserved to the pool (e.g., on timeout or producer close), and signals the next waiter. 
  • All chunks have the same fixed size, set internally to min(16384, batch.size) and floored at 128 bytes so a chunk always fits the record-batch header. 
  • The chunk size is not exposed as a producer config in this KIP. It is kept as an internal grouping unit used to build the batch (may be tuned/lowered depending on the performance validations and benchmarks). The "batch.size" remains the user-facing batching size config. 

Mid-batch allocation for new record                                                                                                                                                                

When a new record arrives mid-batch:

  • the producer first tries a non-blocking pool acquire for all the additional chunks needed (a single atomic multi-chunk call). So an open batch keeps growing as long as pool memory is available without blocking.
  • If the non-blocking acquire fails (pool exhausted), the producer will mark the current batch as "closed for appends" (no more records added, releases compressor buffers, makes the batch eligible to drain). The actual batch close and memory deallocation then happen in the Sender thread. Same as in the current behaviour when a batch fills up.
  • The producer will then block on the pool to allocate the chunks for the new record (up to max.block.ms).
  • If max.block.ms elapses, BufferExhaustedException is returned via the failed future, same as today's first-record path.

This is the same close-and-block path used today in the "full" strategy for first-record allocation, same send()  and max.block.ms  contract.

The difference introduced with this mid-batch behaviour reflects on the blocking semantics: under the incremental strategy, send() may block (or fail with BufferExhaustedException after max.block.ms) on records other than the first of a batch when the pool is exhausted. Under the "full" strategy this never occurs in practice because the full batch is reserved upfront.                                                               

Blocking behaviour

  • First record for a batch blocks for memory for "max.block.ms" and may throw BufferExhaustedException (same as today's "full" strategy). 
  • Following records appended to the open batch:
    • try a non-blocking pool allocation when more chunks are needed; if that succeeds the append never blocks
    • if the non-blocking acquire fails (pool exhausted), the current batch is closed and the new record follows the "first-record path", blocking for the pool memory needed. 

  • Ensures the producer thread holds no open (non-drainable) batch before blocking on pool memory (same principle as today's "full" strategy)                                                                                                                                                                            

Direct heap allocation 

  • Direct heap allocation is triggered in the same cases as in today's "full" strategy:
    • compressed data exceeding the estimation → similar trigger but with improved handling, to try the pool first (non-blocking) before falling back to direct allocation. See "Mid-record growth with compression" section below.
    • batch exceeding the broker's max.message.bytes that is split into sub-batches → as above, similar trigger, improved handling

Mid-record growth with compression

The number of chunks initially reserved for a record could end up being insufficient when compression is used: when the estimated ratio is lower than the actual one, or when the compressor's internal buffers flush bytes from prior records into the current record's write. In such cases, additional chunks are acquired as needed as the record is being written: try non-blocking pool allocation first; if pool exhausted then fall back to direct heap allocation to get the chunks needed and close batch for early send.

This mid-record growth path is never required when producing uncompressed data. For compressed data it is bounded by the compressor's internal buffer size (each flush emits at most a buffer's worth of compressed bytes beyond what was reserved), plus the ratio estimation error over the batch's data.

Observability

The existing bufferpool-wait-*  and buffer-exhausted-*  metrics fully cover the buffer.memory and guide its tuning (same as today). Mid-batch close-and-block goes through the same pool allocation path that already records these metrics.

Compatibility, Deprecation, and Migration Plan

No impact on existing users. The default value of "buffer.memory.allocation.strategy" is "full" , which preserves the current behaviour unchanged.

Migrating to the new strategy only requires a client configuration change to set "buffer.memory.allocation.strategy=incremental"

Out of scope for this KIP: once the new incremental strategy has been sufficiently validated, we could consider making it default. At that point we could also re-think and increase the default "batch.size" currently set at 16384 bytes. The incremental strategy would allow to afford larger batches, so we could consider relaxing the batch.size default value and let applications accumulate more data focusing mainly on tuning the linger time.

Test Plan

  • All existing producer tests must pass unchanged, with "buffer.memory.allocation.strategy = full" (the default), guaranteeing applications that don't opt in are not affected.
  • New unit and integration tests covering the "incremental" path.
  • Extensive performance benchmarks across diverse workloads, considering different partitioning strategies, compression types, client configurations, broker-side related configurations impacting batch splitting, etc.

Rejected Alternatives

Customer-facing configs to control the incremental chunk-based allocation (similar to KIP-782)

  • This KIP deliberately avoids expanding the public API surface with new configs to control batch sizing. Instead, it relies on the existing producer configs ("batch.size", "linger.ms", "buffer.memory"), just making internal implementation changes to allocate memory more efficiently. 
  • Once this improvement becomes available and gains usage across multiple workloads, we can assess if additional tuning flexibility is really needed, and follow-up with separate KIPs. 
  • Direct heap allocation on new record mid-batch when pool exhausted. This was initially considered (over closing the batch and blocking on send to allocate for the new record). This alternative would maintain the blocking pattern (send blocking only on first record for a batch), but would introduce a new direct allocation path that could be hit to allocate one record per batch, so favouring the close-and-block pattern that already exists in the "full" strategy for first-record.

    • this rejected alternative was the main motivation for adding new pool exhaustion metrics (bufferpool-overflow-bytes-*, to get visibility on the new direct-allocation path). The proposed metrics were discarded along with this approach. 
  • Calculate number of chunks needed based on the record's uncompressed upper-bound size. Simple and never under-provisions, but for compressed data it would over-reserve more pool memory than the batch will emit while it is open. Rejected in favour of a ratio-aware estimation (see Internal allocation strategy).


  • No labels