Status

Current state: Accepted

Discussion thread: here 

JIRA: KAFKA-20056 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka record headers are increasingly used to carry metadata such as schema identifiers, correlation IDs, tracing information, and feature flags. Today, Kafka Streams does not persist record headers in state stores: only the serialized key and value bytes are stored. As a consequence, any header-based semantics are lost once a record passes through a stateful operator and is materialized in a state store.

This creates several limitations:

  • Header-dependent serdes cannot be used consistently with state stores. Serdes that expect to read and/or write metadata in headers work correctly for input and output topics, but fail once data is materialized in a state store and restored via its changelog, because headers are not preserved.
  • Downstream processors cannot rely on header metadata after a stateful operation. For example, if a processor uses headers to drive its behavior (e.g., routing, schema selection, or feature toggles), those headers disappear once records are written to and read from a state store.
  • Interactive Queries cannot expose header metadata associated with stored records, which limits observability and end-to-end tracing when headers are part of the application’s semantics (Note that implementing IQs is outside the scope of this KIP.).

This KIP introduces header-aware state store interfaces and implementations that preserve record headers alongside the value and the timestamp.

The primary goals are:

  • Enable state stores that preserve record headers in a first-class way.
  • Provide a smooth, rolling-upgrade path from existing stores without headers.

Public Interfaces

We add five new header-aware store types:

  • TimestampedKeyValueStoreWithHeaders 
  • VersionedKeyValueStoreWithHeaders
  • TimestampedWindowStoreWithHeaders
  • SessionStoreWithHeaders
  • ReadOnlySessionStoreWithHeaders

For a seamless single rolling bounce upgrade, we introduce the HeadersBytesStore interface that provides a static helper method convertToHeaderFormat to convert old byte[] array values to new value and headers format. Additionally, this interface is be used by byte-stores, to indicate that they expect the new value+headers format. 

Proposed Changes

High-level design

The core idea is to embed serialized headers into the stored value bytes while preserving the existing key/value store abstraction. The on-disk and in-memory store format becomes:

value := [ headers_size ][ headers_bytes ] [ payload_bytes ]

  • headers_size size of headers_bytes in bytes stored as varint. Having the headers_size available means we don’t need to deserialize the entire value just to determine the actual headers payload. When users scan large ranges and only care about the values, eagerly deserializing headers would add unnecessary overhead. headers_size enables lazy header deserialization, which is why we introduced it.
  • headers_bytes is the serialized form of the Headers object.
  • payload_bytes is the existing serialized value as produced by the value serde.

What is the serialization format of the headers_bytes? it is an array of headers: first a varint header count, then for each header a varint key length + UTF‑8 key bytes, followed by a varint value length (‑1 for null) + raw value bytes. Note that the header order is preserved during both serialization and deserialization.

when the headers are null or empty we define headers_size = 0 and encode the headers using zero bytes. Since empty headers may be very common, this optimization can have a significant impact on overall storage and network usage.


package org.apache.kafka.streams.state;

/* classes and interfaces to be added */

public final class ValueTimestampHeaders<V> {

    private final V value;
    private final long timestamp;
    private final Headers headers;
    public static <V> ValueTimestampHeaders<V> make(final V value,final long timestamp,final Headers headers) {}
    public static <V> ValueTimestampHeaders<V> makeAllowNullable(final V value, final long timestamp,final Headers headers) {}
    public static <V> V getValueOrNull(final ValueTimestampHeaders<V> valueTimestampHeaders) {} // returns `null` if `valueTimestampHeaders==null` 
}

public final class VersionedRecordWithHeaders<V> {

    private final V value;
    private final long timestamp;
    Optional<Long> validTo;
    private final Headers headers;

    // factory methods and accessors omitted for brevity
}

// headers-aware data type to be used in `SessionStoreWithHeaders`
class AggregationWithHeaders<AGG> {
      private final AGG aggregation;
      private final Headers headers;
}


public interface TimestampedKeyValueStoreWithHeaders<K, V> extends KeyValueStore<K, ValueTimestampHeaders<V>> {}
public interface VersionedKeyValueStoreWithHeaders<K, V> extends StateStore {}
public interface TimestampedWindowStoreWithHeaders<K, V> extends WindowStore<K, ValueTimestampHeaders<V>> {}
public interface SessionStoreWithHeaders<K, AGG> extends StateStore {}


// New helper interface for upgrade: HeadersBytesStore
public interface HeaderBytesStore {
    static byte[] convertToHeaderFormat(final byte[] value) {}
}

/* Updates to existing classes: The new methods are additive. Existing methods without headers remain unchanged and continue to work exactly as today. */

// In the StateSerdes class, the current de/serialization methods without headers argument are marked as @Deprecated and will be removed in future versions to make the class cleaner.
public final class StateSerdes<K, V> {

	public K keyFrom(final byte[] rawKey, final Headers headers) {}
	public V valueFrom(final byte[] rawValue, final Headers headers) {}
	public byte[] rawKey(final K key, final Headers headers) {}
	public byte[] rawValue(final V value, final Headers headers) {}
	@Deprecated
	public K keyFrom(final byte[] rawKey) {}
	@Deprecated
	public V valueFrom(final byte[] rawValue) {}
    @Deprecated
	public byte[] rawKey(final K key) {}
    @Deprecated
	public byte[] rawValue(final V value) {}
}

// We extend the Stores helper class with header-aware variants of the timestamped and versioned store factories.

public final class Stores {
    // Timestamped Key-value stores
	public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStoreWithHeaders(final String name){}
    // Versioned key-value stores
    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStoreWithHeaders(final String name, final Duration historyRetention) {}
    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStoreWithHeaders(final String name, final Duration historyRetention, final Duration segmentInterval) {}
    // Window stores
    public static WindowBytesStoreSupplier persistentTimestampedWindowStoreWithHeaders(final String name, final Duration retentionPeriod, final Duration windowSize,
            final boolean retainDuplicates) throws IllegalArgumentException {}
	// Session stores
	public static SessionBytesStoreSupplier persistentSessionStoreWithHeaders(final String name, final Duration retentionPeriod) {}

    // Store builders
	public static <K, V> StoreBuilder<TimestampedKeyValueStoreWithHeaders<K, V>> timestampedKeyValueStoreWithHeadersBuilder(final KeyValueBytesStoreSupplier supplier, final Serde<K> keySerde, 	       final Serde<V> valueSerde) {}
public static <K, V> StoreBuilder<VersionedKeyValueStoreWithHeaders<K, V>> versionedKeyValueStoreWithHeadersBuilder(final KeyValueBytesStoreSupplier supplier, final Serde<K> keySerde, 	       final Serde<V> valueSerde) {}
    public static <K, V> StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> timestampedWindowStoreWithHeadersBuilder(final WindowBytesStoreSupplier supplier, final Serde<K> keySerde,
final Serde<V> valueSerde) {}
    public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> sessionStoreWithHeadersBuilder(final SessionBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde) {}
}

// To support testing applications that use header-aware state stores, we extend TopologyTestDriver with accessors for the new store types.
package org.apache.kafka.streams;

public class TopologyTestDriver implements Closeable {
    public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>> getTimestampedKeyValueStoreWithHeaders(final String name) {}
    public <K, V> VersionedKeyValueStoreWithHeaders<K, V> getVersionedKeyValueStoreWithHeaders(final String name) {}
	public <K, V> WindowStore<K, ValueTimestampHeaders<V>> getTimestampedWindowStoreWithHeaders(final String name) {}
    public <K, V> SessionStoreWithHeaders<K, V> getSessionStoreWithHeaders(final String name) {}
}

Upgrade path

We strive to provide a single rolling-bounce upgrade path, similar in spirit to KIP-258:

  • DSL users:
    • Header-aware stores are not going to be wired into DSL operators. Defining header-combining semantics across operators (joins, aggregations, etc.) are left to a follow-up KIP.
  • PAPI users:
    • Existing applications that use the current store factories are unaffected.
    • Applications that need headers can explicitly switch to the new Stores.*WithHeaders factories and builders.

In-Memory Stores: Data is reloaded from the changelog on restart. Since the changelog format is preserved, convertToHeaderFormat is used during the read process to inject headers into the values.

Persistent Stores:

  • Reads: Legacy records are served with an empty header set (header counts = 0).

  • Writes: All new records utilize the new storage format.

  • RocksDB: Implements a dual-Column Family approach. A "dual put/get" mechanism ensures a lazy upgrade of data as it is accessed.


Up/downgrade limitations and required steps

  • Note that we do not introduce separate headers-enabled variants of the non-timestamped KeyValueStore and WindowStore. Instead, we rely on the existing timestamped and versioned stores, which already carry timestamps, and extend those to include headers.
  • There is no direct upgrade path from non-timestamped stores to header-aware stores. If you currently use non-timestamped state stores, first migrate them to timestamped stores and only then upgrade to headers-aware stores.
  • Downgrades are not supported; the only way to achieve one is by deleting the local store and rebuilding it from the changelog.


Compatibility, Deprecation, and Migration Plan

This KIP introduces header-aware state stores (currently as an opt-in feature for Processor API (PAPI) users). It is designed to be backward compatible and supports a single rolling-bounce upgrade for existing applications.

Backward Compatibility

  • Existing Stores: All existing state store implementations (non-header-aware) remain unchanged. Applications using Stores.persistentTimestampedKeyValueStore() or similar methods will continue to function without any impact on their storage format or performance.

  • Changelog Compatibility: The changelog topic format is not modified. Headers are already a native part of the Kafka ConsumerRecord. During restoration/replay, the headers are retrieved from the record and integrated into the state store value using the convertToHeaderFormat utility.

  • Public API: The StateSerde methods that do not take headers as an input parameter (as listed in the Proposed Changes section) are deprecated and will be removed in a future releases. 

Performance Considerations

  • Lazy Deserialization: To minimize the impact on iteration-heavy workloads (like range scans), the implementation uses lazy header deserialization. The headers are only parsed into objects if the user explicitly accesses them, avoiding unnecessary CPU and GC overhead during simple value-only scans.

  • Storage Overhead: Storing headers increases the storage footprint. Users should monitor disk usage after opting into header-aware stores, particularly if they store many small records with large headers.

Test Plan

- We will add unit and integration tests covering:

  • RocksDB-backed header-aware stores:
    • Read/write of key, value, timestamp, and headers.
    • Correct splitting and reconstruction of headers and payload.
  • In-memory header-aware stores:
    • Correct behavior across restart via changelog replay.
  • Wrapper layers:
    • Metered, caching, and change-logging layers for key-value, window, session, and versioned stores, verifying header propagation and metrics stability.
  • TopologyTestDriver:
    • New get*WithHeaders accessors correctly expose header-aware stores and behave as expected in unit tests (including window and session semantics).

- Either the existing system tests will be updated to cover the upgrade path, or new tests will be added.


  • No labels