DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under discussion
Discussion thread: here
JIRA: here
Motivation
Kafka Streams allows users to configure many different memory limits, to pro-actively manage memory usage to avoid OutOfMemory errors.
However, on the restore code path, there is no such memory management. The `StoreChangelogReader` fetches a batch of messages, and buffers them inside `ChangelogMetadata`, and there is no limit enforced on this buffer, eventually leading to OutOfMemoryError during backfills. So only in the normal processing path, streams enforce a buffer limit via buffered.records.per.partition (default 1000), but when restore happens, there is no such logic in StoreChangelogReader. A brief discussion here as well.
This kip introduces a config in StreamsConfig 'restore.buffered.records.per.partition' to cap the per-partition restore buffer.
Public Interfaces
1. New StreamsConfig configuration
package org.apache.kafka.streams;
public static final String RESTORE_BUFFERED_RECORDS_PER_PARTITION_CONFIG =
"restore.buffered.records.per.partition";
private static final String RESTORE_BUFFERED_RECORDS_PER_PARTITION_DOC =
"Maximum number of records to buffer per partition during state-store " +
"restoration. Bounds the memory used by the restore consumer's " +
"per-partition buffer in StoreChangelogReader. Distinct from the " +
"deprecated buffered.records.per.partition, which bounds the main " +
"processing queue and does not affect the restore path.";
There are no other public changes.
Proposed Changes
- Register the new config
In `streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java`, add the constant alongside the existing `BUFFERED_RECORDS_PER_PARTITION_CONFIG` block and register it in the `ConfigDef` :
define(RESTORE_BUFFERED_RECORDS_PER_PARTITION_CONFIG, Type.INT, 10000, atLeast(1), Importance.LOW, RESTORE_BUFFERED_RECORDS_PER_PARTITION_DOC)
- Read the config in `StoreChangelogReader`
StoreChangelogReader already takes a `StreamsConfig` in its constructor. Read the new value once and store it as:
private final int restoreMaxBufferedRecordsPerPartition;
and initialized in the constructor from `config.getInt(StreamsConfig.RESTORE_BUFFERED_RECORDS_PER_PARTITION_CONFIG)`.
- Enforce the buffer cap via pause / resume
`StoreChangelogReader` already pauses and resumes partitions on the restore consumer through `pauseResumePartitions(...)` and `updatePartitionsByType(...)`.
The new enforcement reuses the same mechanism with minor enhancement:
- Before polling - shouldPause() method (`pollRecordsFromRestoreConsumer`): for every currently restoring partition whose `ChangelogMetadata.bufferedRecords.size() >= restoreMaxBufferedRecordsPerPartition`, add it to the pause set. This prevents the restore consumer from fetching more data for partitions whose buffer is already full.
- After draining – shouldResume() method (`restoreChangelog`): once records have been applied to the state store and `bufferedRecords.size() < restoreMaxBufferedRecordsPerPartition`, the partition becomes eligible for resume on the next `pauseResumePartitions(...)` call.
This is just a filter on top of existing pause-resume functionality and both paths normal and restore (buffered.records.per.partition and restore.buffered.records.per.partition ) go through the same poll/buffer/restore loop.
- Documentation
- Add the new config to the "Streams Configs" reference at `docs/streams/developer-guide/config-streams.html`, adjacent to `buffered.records.per.partition`, with a clear note that the two configs target different code paths.
- Add a short note in the upgrade guide for the target release, calling out the new default cap on restore-path buffering
- Update the "Memory Management" section of the Streams developer guide to list restore-path buffering alongside cache.
Compatibility, Deprecation, and Migration Plan
Compatibility
For users who do not set the new config, unbounded buffering to buffering capped at 10000 recs per partition. For better throughput on restore, during large backfills, increase the config.
Deprecation - na-
Migration
No user action is required. Users concerned about a throughput regression can explicitly set `restore.buffered.records.per.partition` to a larger value (e.g., `100000`) to approximate the previous unbounded behavior for typical deployments.
Test Plan
Adds unit and integration test coverage and mainly trigger restoration, asserting restoration is finished successfully.
Rejected Alternatives
We can apply the same config buffered.records.per.partition in StoreChangelogReader, but 1000 looks low, as it can slow down restoration and we should also consider polling. If polling is faster and restore is slow, we notice a lag.