Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA: here
Pull Request: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Records are explicitly deleted once they have been fully consumedKafka Streams treats repartition topics differently to regular topics. Instead of setting arbitrary retention criteria and having the broker cleanup old records, Kafka Streams sets infinite retention on repartition topics and explicitly deletes records once they've been committed to the next topic in their Topology. Currently, this is done every time the Task is committed, resulting in explicit "delete records" requests being sent every commit.interval.ms
milliseconds.
When commit.interval.ms
is set very low, for example when processing.guarantee
is set to exactly_once_v2
, this causes delete records requests to be sent extremely frequently, potentially reducing throughput and causing a high volume of log messages to be logged by the brokers.
Public Interfaces
...
New configuration options
Name | Type | Importance | Default | Description |
---|---|---|---|---|
repartition.purge |
...
.interval.ms | Long | LOW | 30000 | The minimum interval in milliseconds with which to delete fully consumed records from repartition topics. Purging will |
...
occur after at least this value since the last purge, but may be delayed until later. (Note, unlike commit.interval.ms , the default for this value remains unchanged when processing.guarantee is set to exactly_once_v2 ). |
Proposed Changes
Adding a new configuration option, deleterepartition.purge.interval.ms
, that configures the frequency period of these explicit record deletions are sent , will resolve the issue , by enabling users to tune the commit.interval.ms
anddelete repartition.purge.interval.ms
separately.We will still wait for a commit before explicitly deleting repartition records, but we will only do so if the time since the last record deletion is at least delete.interval.ms
. This means the lower-bound for delete
Compatibility, Deprecation, and Migration Plan
- The interval between explicit delete requests for repartition records will no longer be coupled to
commit.interval.ms
. Default behaviour is unchanged, however:- When
commit.interval.ms
is
- When
...
- explicitly modified by the user, old repartition records will no longer be deleted on every commit.
- When
processing.guarantee
is set toexactly_once_v2
, since the defaultcommit.interval.ms
is changed internally to100 ms
, old repartition records will no longer be deleted on every commit. - Users can regain this coupling by explicitly configuring both
commit.interval.ms
andrepartition.purge.interval.ms
to the same value.
Rejected Alternatives
- Purging after exactly the configured amount of time has elapsed was rejected, as it would necessitate a design that would likely have a negative performance or correctness impact.
- Purging after a specified multiple of commits was rejected, as it would be tightly coupled to the value of another config parameter (
commit.interval.ms
), which would cause a likely unintended change to purge behavior whenever the commit interval was reconfigured, including implicitly when the processing.guarantee is changed
Compatibility, Deprecation, and Migration Plan
- Default value for
delete.interval.ms
will be set to30 seconds
, the (current) default value ofcommit.interval.ms
. This ensures that users who do not modify either setting will retain the existing behaviour.- For users that use EOS, the default
commit.interval.ms
is automatically reduced to100ms
. The default value ofdelete.interval.ms
will not be reduced when EOS is enabled, to ensure that these users benefit from the improved performance of these changes.
- For users that use EOS, the default
Rejected Alternatives
...
- .