DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: "Under Discussion"
Discussion thread: To be updated
JIRA: KAFKA-19759 - Getting issue details... STATUS
Motivation
Streams applications often use persistent state stores (e.g.TimestampedKeyValueStore) to maintain state across records.
However, many use cases require that state entries expire automatically after a defined retention period (TTL) — for example:
Orders or events that are only valid for 30 days.
Temporary correlation or deduplication stores.
Large stores where data must be bounded in size.
Today, devs must implement TTL manually by:
Maintaining timestamps in store values.
Scheduling cleanup punctuations using context.schedule()
Iterating over all entries and deleting expired ones.
This approach is boilerplate and seems very inconsistent to implement in topologies.
Moreover, users must manually handle timestamp regression and produce tombstones for changelog consistency.
This KIP proposes to add first-class, optional TTL support directly to the Kafka Streams state store API, simplifying user code and ensuring consistent cleanup behavior.
Public Interfaces
This KIP introduces a new builder option on state stores:
StoreBuilder<K, V> withTtl(Duration ttl);
Behavior
If ttl is provided, entries older than the TTL are automatically expired and removed.
If
.withTtl()is not called (or ttl is null), the store behaves exactly as it does today — no expiration.Expired entries generate tombstones flushed to changelog topic for correct state restoration and compaction.
The eviction policy uses WALL_CLOCK_TIME by default (ensuring cleanup happens even during idle periods). - questionable ???
Example Usage
public class TTLExample {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
// Define the store with TTL
TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore("myTimestampedStore"),
Serdes.String(),
Serdes.String())
// TTL applies to timestamped store
.withTtl(Duration.ofDays(5))
.build();
// Add the store to the topology
builder.addStateStore(store);
// Define the stream processing logic
KStream<String, String> stream = builder.stream("input-topic");
stream.process(() -> new MyProcessor(), store);
// Build and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), new Properties());
streams.start();
}
}
Notes
Applies to only TimestampedKeyValueStore store.
Future work may extend support to keyvalueStore, session and window stores.
Current and Future Scope
With this KIP:
TimestampedKeyValueStore , storesValueAndTimestamp<V>entries, allowing manual cleanup logic (e.g., usingcontext.schedulewithentry.value.timestamp()).This KIP:
Introduces native TTL support forTimestampedKeyValueStoreonly, enabling time-based cleanup without requiring users to manually schedule punctuators. The store will automatically evict entries older than the configured TTL based on the embedded record timestamp.Future Extension:
In later iterations, TTL support can be generalized toKeyValueStoreby internally maintaining a last-update timestamp for each key. This would allow the same.withTtl(Duration)API to apply uniformly, even for non-timestamped stores.
Proposed Changes
Internal Implementation
Extend TimestampedKeyValueStoreBuilder (and related store suppliers) to accept an optional TTL parameter via .withTtl(Duration).
When TTL is configured, the underlying store (e.g., RocksDB-based timestamped store) will:
Leverage the existing timestamp stored in
ValueAndTimestamp<V>for each entry.Periodically scan and evict entries whose stored timestamp exceeds the TTL threshold.
Emit tombstones for each removal and flush to the changelog, ensuring downstream consistency.
Use WALL_CLOCK_TIME as the default cleanup trigger, allowing eviction even when no new records arrive.
(Future enhancement could allow selecting STREAM_TIMEas the eviction basis.) - how ??
Pass TTL metadata to the underlying store implementation (RocksDBStore)
Timestamp Tracking in TimestampedKeyValueStore
Track last-update timestamps in
ValueAndTimestamp<V>.Prevent regression when new records have older timestamps:
@Override
public void put(K key, ValueAndTimestamp<V> value) {
long ts = Math.max(value.timestamp(), getExistingTimestamp(key));
super.put(key, ValueAndTimestamp.make(value.value(), ts));
}
private long getExistingTimestamp(K key) {
ValueAndTimestamp<V> existing = this.get(key);
return existing != null ? existing.timestamp() : 0L;
}
To avoid regression scenarios where late or reordered records arrive with older timestamps, eviction logic will use:
effectiveTimestamp = max(existingTimestamp, recordTimestamp);
Eviction Logic
Default trigger:
WALL_CLOCK_TIME.Optional future support:
STREAM_TIME.Periodic scan deletes expired entries and emits tombstones:
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
try (KeyValueIterator<K, ValueAndTimestamp<V>> iter = store.all()) {
while (iter.hasNext()) {
KeyValue<K, ValueAndTimestamp<V>> entry = iter.next();
if (entry.value.timestamp() + ttlMs <= timestamp) {
store.delete(entry.key());
context.forward(entry.key(), null); // Tombstone
}
}
}
});
This can guarantee that recently updated keys are never prematurely deleted and TTL duration applies relative to the last update timestamp stored in each record.
Compatibility, Deprecation, and Migration Plan
Backward compatible:
Existing applications without.withTtl()are completely unaffected.No deprecations:
TTL is an optional enhancement; existing APIs and store semantic will remain unchanged.Rolling upgrade safe:
Older nodes simply ignore TTL configuration. Once upgraded, cleanup begins automatically without data loss. – questionable again ??Changelog impact:
Expired entries produce tombstones, allowing consistent recovery and downstream cleanup and flush to changelog.Migration path:
Applications using manual punctuation-based cleanup can remove that logic once native TTL is adopted.
Test Plan
Unit tests:
TTL configuration on builder.
Timestamp comparison and eviction correctness.
Integration tests:
Automatic expiration of entries after TTL.
Entries are not evicted before TTL expiry.
Eviction continues even during idle input (WALL_CLOCK_TIME).
Tombstone emission and correct state recovery.
Performance tests:
Evaluate impact of periodic cleanup and measure compaction stability under large state sizes. – highly questionable !!
Rejected Alternatives
Manual cleanup via punctuation (current common approach)
Requires boilerplate code in each processor.
Risk of inconsistent or incorrect timestamp logic.
Harder to maintain and test across multiple applications.
Dedicated TTL store types (e.g.,
TTLTimestampedStore) ~ KAFKA-4212Would fragment API.
Adds unnecessary type complexity.
Making TTL a builder option keeps the API simple and unified.
Always-on TTL
Not suitable for all workloads (e.g., reference tables or long-living stores).
Must remain optional to preserve backward compatibility.
My Summary notes:
This KIP introduces optional TTL support for TimestampedKeyValueStore, leveraging the existing per-record timestamp to enable automatic, consistent, and efficient data expiry.
It removes the need for manual punctuation-based cleanup, reduces boilerplate, and ensures correctness through non-regressive timestamp handling and flushes to changelog- as tombstones.
