Status

Current stateAccepted

Discussion thread: here 

JIRA: KAFKA-20194 - Getting issue details... STATUS KAFKA-20420 - Getting issue details... STATUS

Motivation

This KIP is the follow-up for KIP-1271 to wire headers-aware state stores into the Kafka Streams DSL via an explicit opt-in mechanism.

KIP-1271 introduced headers-aware state store interfaces and implementations (TimestampedKeyValueStoreWithHeaders, VersionedKeyValueStoreWithHeaders, TimestampedWindowStoreWithHeaders, SessionStoreWithHeaders, etc.), along with a rolling-upgrade path from timestamped stores. These stores preserve record headers alongside the value and timestamp, enabling use cases such as:

  • header-based Schema Registry formats (e.g., header schema-id serializer),

  • tracing metadata stored in headers,

  • other header-based semantics that must survive passage through stateful operators.

However, KIP-1271 explicitly did not wire headers-aware state stores into DSL operators. It limited usage to the Processor API and Stores.*WithHeaders factories, leaving DSL support and header-combining semantics to a follow-up KIP.

This leaves a gap:

  • DSL applications cannot easily opt into headers-aware state stores.

  • At the same time, most existing DSL applications do not need headers in state stores and should not pay additional storage or CPU overhead by default.

The goal of this KIP is therefore:

  • Provide a simple, explicit, and backwards-compatible way for DSL users to opt into headers-aware state stores, reusing the stores and upgrade paths defined in KIP-1271,

  • while keeping headers semantics of DSL operators out of scope for now (headers should be considered “empty” from a DSL semantics point of view in this KIP).

Public Interfaces

1. New StreamsConfig configuration

We add a DslStoreFormat enum to enumerate DSL store formats, with two values: DEFAULT and HEADERS. Selecting HEADERS opts the application into using the headers-aware stores. In addition, we introduce a new Streams configuration key dsl.store.format, which defaults to DEFAULT, and can be set by users to HEADERS to enable headers-aware stores.




package org.apache.kafka.streams;

public enum DslStoreFormat {
    /** The non-timestamped state stores */
    PLAIN("PLAIN"),

    /** The timestamped state stores */
    TIMESTAMPED("TIMESTAMPED"),

    /** The headers-aware state stores */
    HEADERS("HEADERS");
}
public class StreamsConfig extends AbstractConfig {
    // existing fields and methods
    public static final String DSL_STORE_FORMAT_CONFIG = "dsl.store.format";
    public static final String DSL_STORE_FORMAT_DEFAULT = "DEFAULT";
    public static final String DSL_STORE_FORMAT_HEADERS = "HEADERS";
}

This config is orthogonal to KIP-591’s “default store implementation” configuration (which selects RocksDB vs in-memory implementation). KIP-591 controls implementation, whereas this KIP controls the format (with or without headers) used by that implementation.

We intentionally use an enum-valued config, not a boolean, to keep the option extensible for future changes.

2. Other Public changes

KIP-1271 introduced headers-aware store formats and builders but did not change the DSL. Internal DSL wiring uses parameter objects (hereafter referred to as Dsl*Params) that carry store configuration from DSL operators to the underlying store factories. 

Concretely, we add:

package org.apache.kafka.streams.state

public final class DslKeyValueParams {
    // existing fields and methods …

	 @Deprecated
     public DslKeyValueParams(final String name, final boolean isTimestamped) {}

     public DslKeyValueParams(final String name, final DslStoreFormat storeFormat) {}

     @Deprecated
     public boolean isTimestamped();

	 public DslStoreFormat storeFormat();
		
}

public final class DslWindowParams {
    // existing fields and methods …

	@Deprecated
    public DslWindowParams(final String name,
            			   final Duration retentionPeriod,
                           final Duration windowSize,
                           final boolean retainDuplicates,
                           final EmitStrategy emitStrategy,
                           final boolean isSlidingWindow,
                           final boolean isTimestamped) {}

    public DslWindowParams(final String name,
            			   final Duration retentionPeriod,
                           final Duration windowSize,
                           final boolean retainDuplicates,
                           final EmitStrategy emitStrategy,
                           final boolean isSlidingWindow,
                           final DslStoreFormat storeFormat /* newly added */) {}
                  
    @Deprecated
    public boolean isTimestamped();

	public DslStoreFormat storeFormat();
}

public final class DslSessionParams {
    // existing fields and methods …

	@Deprecated
    public DslSessionParams(final String name,
            				final Duration retentionPeriod,
            				final EmitStrategy emitStrategy) {}  

    public DslSessionParams(final String name,
            				final Duration retentionPeriod,
            				final EmitStrategy emitStrategy,
                            final DslStoreFormat storeFormat /* newly added */) {} 

	public DslStoreFormat storeFormat();
}

/* The implementation of the methods of BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers and BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers will be updated to recognize the new Dsl*Params property and to create the appropriate store format based on its value. */


Semantics:

  • By default, headersEnabled is derived from the global config dsl.store.format

  • No new public DSL builder methods are added; The common path is to rely solely on dsl.store.format.

  • This KIP deliberately does not introduce any new DSL API to read or manipulate headers from user code, nor does it define header-combining semantics across joins/aggregations. 

Exposing headers to DSL UDFs and defining header-combining semantics across operators is explicitly left to a future KIP.

Proposed Changes

High-level design

When a Streams application starts:

  1. StreamsConfig reads dsl.store.format 

  2. Each DSL store factory (AbstractConfigurableStoreFactory and subclasses) reads this config and sets the store format. 

  3. When a DSL operator materializes a store, it creates the appropriate Dsl*Params instance, which carries the DSL store format that defaults from the global config.

  4. BuiltInDslStoreSuppliers reads params.storeFormat() and chooses between:

    • the existing timestamped store implementations (for session stores the default is the basic session store), or
    • the headers-aware implementations and builders introduced by KIP-1271.

Interaction with KIP-1271 upgrade paths

KIP-1271 defined upgrade strategies and on-disk formats for headers-aware stores. This KIP reuses those stores and upgrade paths as-is:

  • When dsl.store.format = HEADERS, DSL operators will:

    • create headers-aware stores and corresponding changelog topics, and

    • rely on the KIP-1271 upgrade/downgrade behavior if they encounter existing state.

  • When dsl.store.format = DEFAULT, DSL operators continue to use the existing timestamped or plain stores with their established behavior.

Global vs per-store enabling of headers-aware state stores

Global: Setting dsl.store.format = HEADER 

Per-store enablement: 

When using Materialized.as(...) with a user-supplied KeyValueBytesStoreSupplier (and analogously for window and session stores), it is also possible to enable or disable headers-aware stores on a per-store basis, independent of the global StreamsConfig setting.

Today, the DSL already "bridges" between plain key-value and timestamped stores based on the runtime type of the supplied store: if the BytesStore implementation passed to Materialized.as(...)passing a plain KeyValueBytesStore (for example via Stores.persistentKeyValueStore(...)) effectively disables timestamped stores for that particular operator.

KIP-1271 introduces a similar marker interface, HeaderBytesStore, and KIP-1285 extends the existing bridging logic to also consider this marker. Concretely, if a user passes a supplier such as:

Materialized.as(
    Stores.persistentTimestampedKeyValueStoreWithHeaders(...)
)

then the corresponding DSL operator will use a headers-aware store for that state store, overriding whatever is configured via StreamsConfig. If the supplied store does not implement HeaderBytesStore, the operator will continue to use a non-headers store, again independently of the global headers-store configuration. 

Scope: headers semantics and UDFs

The KIP focuses solely on enabling headers-aware storage from the DSL, not on defining how headers are combined or exposed. 

  • We exclude operator-level header semantics (joins, aggregations, etc.).

  • For now, using headers-aware stores from the DSL should be understood primarily as enabling future semantics work, not as a user-facing header-combining model.

A separate future KIP will:

  • define how headers propagate through individual operators, and

  • potentially expose headers to UDFs for computing result headers.


Compatibility, Deprecation, and Migration Plan

Default behavior (dsl.store.format = DEFAULT) keeps all existing DSL behavior unchanged.

The new configuration and methods are additive and optional:

  • Applications that do not set dsl.store.format continue to run as before.

  • Processor API and existing headers-aware usage as defined in KIP-1271 remain unaffected.

From an upgrade perspective, if an existing application introduces this config and restarts:

  • For stores backed by headers-aware implementations, KIP-1271’s upgrade rules apply (e.g., using HeadersBytesStore.convertToHeaderFormat or dual CF/segment-level upgrades).

  • The same as in KIP-1271, downgrades are not supported; the only way to achieve one is by deleting the local store and rebuilding it from the changelog.
  • For in-memory stores, state is reconstructed from the changelog using the new format, as specified in KIP-1271.

We do not deprecate any existing configs or DSL APIs.

Test Plan

This KIP builds on the test strategy of KIP-1271 and adds DSL-specific coverage.

Unit tests

  • Config wiring: Verify StreamsConfig correctly parses dsl.store.format and that AbstractConfigurableStoreFactory.headersEnabled() behaves as expected for "default" and "headers".

  • Dsl*Params

  • BuiltInDslStoreSuppliers

  • Materializers and join/internal stores

Integration tests

  • DSL + headers-aware stores:

  • Upgrade tests: Start a DSL application with dsl.store.format = DEFAULT, populate state, then:

      • upgrade code + config to dsl.store.format = HEADERS,

      • perform a rolling bounce,

      • verify that state is present and correct post-upgrade, using both DSL APIs and direct store access via TopologyTestDriver.get*WithHeaders(...).


  • No labels