This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: "Under Discussion"

Discussion thread: To be updated

JIRA: KAFKA-19759 - Getting issue details... STATUS

Motivation

Streams applications often use persistent state stores (e.g.TimestampedKeyValueStore) to maintain state across records.
However, many use cases require that state entries expire automatically after a defined retention period (TTL) — for example:

  • Orders or events that are only valid for 30 days.

  • Temporary correlation or deduplication stores.

  • Large stores where data must be bounded in size.

Today, devs must implement TTL manually by:

  • Maintaining timestamps in store values.

  • Scheduling cleanup punctuations using context.schedule()

  • Iterating over all entries and deleting expired ones.

This approach is boilerplate and seems very inconsistent to implement in topologies.
Moreover, users must manually handle timestamp regression and produce tombstones for changelog consistency.

This KIP proposes to add first-class, optional TTL support directly to the Kafka Streams state store API, simplifying user code and ensuring consistent cleanup behavior.

Public Interfaces

This KIP introduces a new builder option on state stores:

StoreBuilder<K, V> withTtl(Duration ttl);

Behavior

  • If ttl is provided, entries older than the TTL are automatically expired and removed.

  • If .withTtl() is not called (or ttl is null), the store behaves exactly as it does today — no expiration.

  • Expired entries generate tombstones flushed to  changelog topic for correct state restoration and compaction.

  • The eviction policy uses WALL_CLOCK_TIME by default (ensuring cleanup happens even during idle periods). - questionable ???

Example Usage

Topology
public class TTLExample {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();

        // Define the store with TTL         
    	TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
                    Stores.persistentTimestampedKeyValueStore("myTimestampedStore"),
                    Serdes.String(),
                    Serdes.String())
            // TTL applies to timestamped store
            .withTtl(Duration.ofDays(5))
            .build();

        // Add the store to the topology
        builder.addStateStore(store);

        // Define the stream processing logic
        KStream<String, String> stream = builder.stream("input-topic");
        stream.process(() -> new MyProcessor(), store);

        // Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), new Properties());
        streams.start();
    }
}


Notes

  • Applies to only TimestampedKeyValueStore store.

  • Future work may extend support to keyvalueStore, session and window stores.

  • Current and Future Scope

    With this KIP:
    TimestampedKeyValueStore , stores ValueAndTimestamp<V> entries, allowing manual cleanup logic (e.g., using context.schedule with entry.value.timestamp()).

    This KIP:
    Introduces native TTL support for TimestampedKeyValueStore only, enabling time-based cleanup without requiring users to manually schedule punctuators. The store will automatically evict entries older than the configured TTL based on the embedded record timestamp.

    Future Extension:
    In later iterations, TTL support can be generalized to KeyValueStore by internally maintaining a last-update timestamp for each key. This would allow the same .withTtl(Duration) API to apply uniformly, even for non-timestamped stores.

Proposed Changes

Internal Implementation

Extend TimestampedKeyValueStoreBuilder (and related store suppliers) to accept an optional TTL parameter via .withTtl(Duration).

When TTL is configured, the underlying store (e.g., RocksDB-based timestamped store) will:

  • Leverage the existing timestamp stored in ValueAndTimestamp<V> for each entry.

  • Periodically scan and evict entries whose stored timestamp exceeds the TTL threshold.

  • Emit tombstones for each removal and flush to the changelog, ensuring downstream consistency.

  • Use WALL_CLOCK_TIME as the default cleanup trigger, allowing eviction even when no new records arrive.
    (Future enhancement could allow selecting STREAM_TIMEas the eviction basis.) - how ??

Pass TTL metadata to the underlying store implementation (RocksDBStore)


Timestamp Tracking in TimestampedKeyValueStore


  • Track last-update timestamps in ValueAndTimestamp<V>.

  • Prevent regression when new records have older timestamps:

@Override
public void put(K key, ValueAndTimestamp<V> value) {
    long ts = Math.max(value.timestamp(), getExistingTimestamp(key));
    super.put(key, ValueAndTimestamp.make(value.value(), ts));
}

private long getExistingTimestamp(K key) {
    ValueAndTimestamp<V> existing = this.get(key);
    return existing != null ? existing.timestamp() : 0L;
}



To avoid regression scenarios where late or reordered records arrive with older timestamps, eviction logic will use:

Max Calculation
effectiveTimestamp = max(existingTimestamp, recordTimestamp);

Eviction Logic

  • Default trigger: WALL_CLOCK_TIME.

  • Optional future support: STREAM_TIME.

  • Periodic scan deletes expired entries and emits tombstones:

Context punctuation & Delete
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
    try (KeyValueIterator<K, ValueAndTimestamp<V>> iter = store.all()) {
        while (iter.hasNext()) {
            KeyValue<K, ValueAndTimestamp<V>> entry = iter.next();
            if (entry.value.timestamp() + ttlMs <= timestamp) {
                store.delete(entry.key());
                context.forward(entry.key(), null); // Tombstone
            }
        }
    }
});


This can guarantee that recently updated keys are never prematurely deleted and TTL duration applies relative to the last update timestamp stored in each record.

Compatibility, Deprecation, and Migration Plan

  • Backward compatible:
    Existing applications without .withTtl() are completely unaffected.

  • No deprecations:
    TTL is an optional enhancement; existing APIs and store semantic will  remain unchanged.

  • Rolling upgrade safe:
    Older nodes simply ignore TTL configuration. Once upgraded, cleanup begins automatically without data loss. – questionable again ??

  • Changelog impact:
    Expired entries produce tombstones, allowing consistent recovery and downstream cleanup and flush to changelog.

  • Migration path:
    Applications using manual punctuation-based cleanup can remove that logic once native TTL is adopted.

Test Plan

  • Unit tests:

  • TTL configuration on builder.

  • Timestamp comparison and eviction correctness.

  • Integration tests:

  • Automatic expiration of entries after TTL.

  • Entries are not evicted before TTL expiry.

  • Eviction continues even during idle input (WALL_CLOCK_TIME).

  • Tombstone emission and correct state recovery.

  • Performance tests:
    Evaluate impact of periodic cleanup and measure compaction stability under large state sizes. – highly questionable !!

Rejected Alternatives

  • Manual cleanup via punctuation (current common approach)

  • Requires boilerplate code in each processor.

  • Risk of inconsistent or incorrect timestamp logic.

  • Harder to maintain and test across multiple applications.

  • Dedicated TTL store types (e.g., TTLTimestampedStore) ~ KAFKA-4212

  • Would fragment API.

  • Adds unnecessary type complexity.

  • Making TTL a builder option keeps the API simple and unified.

  • Always-on TTL

  • Not suitable for all workloads (e.g., reference tables or long-living stores).

  • Must remain optional to preserve backward compatibility.


My Summary notes: 

This KIP introduces optional TTL support for TimestampedKeyValueStore, leveraging the existing per-record timestamp to enable automatic, consistent, and efficient data expiry.
It removes the need for manual punctuation-based cleanup, reduces boilerplate, and ensures correctness through non-regressive timestamp handling and flushes to changelog- as tombstones.

  • No labels