DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/rd4hosz31y5lv9nqm85qkj727jw024mq
Voting thread: https://lists.apache.org/thread/xjhfqh8xdwlh7q1r70vd5ol6bhcc1c6g
JIRA: KAFKA-19943 - Getting issue details... STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The current design of Kafka Streams assumes that the local state store and its corresponding changelog topic remain synchronized via the checkpoint file; however, a "zombie data" edge case occurs when an instance restarts using stale local files after a duration exceeding the changelog's delete.retention.ms. This specifically happens when the local checkpoint offset is still available on the broker—often due to the presence of other long-lived entities in the partition—preventing an automatic out-of-range reset. In this scenario, because the broker has already purged the deletion tombstones but the local store still contains the old records, the new instance resumes from its valid offset without ever receiving the "delete" instruction, resulting in the "magical" reappearance of deleted entities
Public Interfaces
New property:
| Name | Type | Default | Importance | Description |
|---|---|---|---|---|
state.cleanup.dir.max.age.ms | Long | -1 (meaning disabled) New filed for StreamsConfig: public static final long STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED = -1 | Low | Time-based threshold for purging local state directories and checkpoint files during application startup. Only state directories that have not been modified for at least state.cleanup.dir.max.age.ms will be removed |
Proposed Changes
We propose introducing a new configuration, state.cleanup.dir.max.age.ms (Long, default: -1), which defines a time-based threshold for purging local state directories and checkpoint files during application startup. When a KafkaStreams instance initializes, it compares the age of the local state against this value; if the threshold is exceeded, the local data is deleted. This forces the application to rebuild its state entirely from the changelog topic.
By triggering this wipe before processing begins, the application ensures the local store is reconstructed using only the broker's current "source of truth." This effectively purges "zombie" records—stale data that persists in local files after its corresponding deletion tombstones have already expired and been removed from the compacted changelog.
The operational logic for this configuration is defined by the following considerations:
- Threshold-Based State Management: Using a millisecond threshold allows users to align local state expiration with broker-side retention settings. This ensures that the local state is refreshed before it risks becoming inconsistent with the available changelog history. This approach also manages the trade-offs for Global State Stores; while they may update less frequently, the user-defined limit ensures a wipe only occurs when the data is genuinely considered stale, making the recovery process for both regular and global stores predictable and intentional.
- Default Behavior: The configuration defaults to -1, which disables the feature. This ensures that, by default, KafkaStreams maintains its existing behavior of attempting to reuse local state to minimize recovery time, unless a specific cleanup window is explicitly defined by the user.
Compatibility, Deprecation, and Migration Plan
This change is backward compatible from new functionality point of view . To ensure no impact on existing users, the state.cleanup.dir.max.age.ms functionality will be disabled by default. As this is a purely additive feature, no deprecation of existing APIs or migration steps for current Kafka Streams applications are required
Test Plan
Unit Tests: Add tests to ensure the state directory is purged during initialization when the property is enabled.
Manual Verification: I have a local reproduction of the "zombie record" issue (simulating tombstone expiration during downtime). I will use this to verify that the new property triggers a full rebuild and ensures state consistency with the changelog
Rejected Alternatives
- We considered storing a "last-write" timestamp in the checkpoint file and comparing it against the broker's
delete.retention.msconfiguration during startup. If the checkpoint age exceeded the retention period, the state would be wiped. This was rejected due to the complexities of clock synchronization across different VMs and clusters - Another alternative involved having brokers track a "deletion watermark" that clients could query. If the local checkpointed offset was lower than the broker’s current deletion watermark, the client would know to trigger a local wipe. This was rejected as being overly complex for the problem at hand. Given that rebuilding a state store from a compacted changelog is already a core Kafka Streams operation (we already remove local files after delay if task was migrated), a simple configuration-based toggle is a more practical and maintainable solution