Status

Current state: Draft

Discussion thread: KAFKA-20334 - Getting issue details... STATUS

JIRA

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-1271 introduced header-aware state stores. During its design, the decision was made to not introduce a plain-value-header window store to limit the number of store types.

When KIP-1285 wired header-aware stores into the DSL, it became clear that the KStream-KStream join would benefit from such a store. As a compromise, the join was wired to use TimestampedWindowStoreWithHeaders, which serializes each record value as: [headersSize(varint)][headersBytes][timestamp(8 bytes)][value].

This wastes 8 bytes per record on the timestamp field. In a KStream-KStream join, the record's timestamp is already encoded in the window store key (used for time-range lookups). When the join processor fetches records from the other side's store, it uses the window key's timestamp, not the timestamp embedded inside ValueTimestampHeaders . The stored timestamp field in the value is never read. For high-throughput join workloads with many small records, this overhead is non-trivial. For example, with 100-byte average record values, the wasted 8 bytes represent ~7.3%=(8 / (1 + 0 + 8 + 100)) storage overhead per record in the window store.

This KIP proposes introducing a WindowStoreWithHeaders that stores only [headersSize(varint)][headersBytes][value], eliminating the redundant timestamp and saving 8 bytes per record.

Public Interfaces

ValueAndHeaders<V> value wrapper

A new general-purpose value wrapper that combines a value with its associated record headers, without a timestamp.

Note: This is a new class, separate from the existing AggregationWithHeaders<AGG> used by SessionStoreWithHeaders. While the serialization format is identical ([headersSize(varint)][headersBytes][value]), AggregationWithHeaders is semantically tied to session aggregation and uses different naming conventions (.aggregation() vs .value()). Introducing ValueAndHeaders avoids a breaking rename and provides clearer semantics for the window store use case.

ValueAndHeaders
package org.apache.kafka.streams.state;

/* classes and interfaces to be added */

public final class ValueAndHeaders<V> {

    private final V value;
    private final Headers headers;

    public static <V> ValueAndHeaders<V> make(final V value, final Headers headers)
    public static <V> ValueAndHeaders<V> makeAllowNullable(final V value, final Headers headers)
    public static <V> V getValueOrNull(final ValueAndHeaders<V> valueAndHeaders)

    // factory methods and accessors omitted for brevity
}

WindowStoreWithHeaders<K, V> interface

WindowStoreWithHeaders
package org.apache.kafka.streams.state;

public interface WindowStoreWithHeaders<K, V> extends WindowStore<K, ValueAndHeaders<V>> {}

New store builder and supplier method in Stores

Store
package org.apache.kafka.streams.state;

public final class Stores {

    public static <K, V> StoreBuilder<WindowStoreWithHeaders<K, V>> windowStoreWithHeadersBuilder(
            final WindowBytesStoreSupplier supplier,
            final Serde<K> keySerde,
            final Serde<V> valueSerde)

    public static WindowBytesStoreSupplier persistentWindowStoreWithHeaders(
            final String name,
            final Duration retentionPeriod,
            final Duration windowSize,
            final boolean retainDuplicates)
}

New Enum Value: DslStoreFormat.PLAIN_HEADERS


DslStoreFormat
public enum DslStoreFormat {
    PLAIN("PLAIN"),
    TIMESTAMPED("TIMESTAMPED"),
    HEADERS("HEADERS"),
    PLAIN_HEADERS("PLAIN_HEADERS");  // NEW
}

PLAIN_HEADERS indicates a store that preserves record headers but does not embed a timestamp in the value. This is used by KStream-KStream join window stores, where the timestamp is already in the window key and does not need to be duplicated in the value.


Proposed Changes

Change format in KStream-KStream Join

The core change is to eliminate the redundant 8-byte timestamp from the serialized value in KStream-KStream join window stores.

Current format (with TimestampedWindowStoreWithHeaders):[headersSize(varint)][headersBytes][timestamp(8 bytes)][value]


New format (with WindowStoreWithHeaders):[headersSize(varint)][headersBytes][value]


The headersSize/headersBytes encoding follows the same rules as KIP-1271:

  • When headers are null or empty: headersSize = 0, headersBytes is omitted (0 bytes). So the format is simply [0x00][value].

  • When headers are non-empty: headersSize > 0, followed by the serialized headersBytes. The headersBytes is an array of headers: first a varint header count, then for each header a varint key length + UTF-8 key bytes, followed by a varint value length (-1 for null) + raw value bytes. Header order is preserved during both serialization and deserialization.

New Store Wrapper Stack

The following new internal classes form the wrapper stack for WindowStoreWithHeaders:

  • WindowStoreWithHeadersBuilder

  • MeteredWindowStoreWithHeaders

  • ChangeLoggingWindowBytesStoreWithHeaders

  • RocksDBWindowStoreWithHeaders: A new class extending RocksDBWindowStore and implementing HeadersBytesStore (but NOT TimestampedBytesStore).

  • RocksDbWindowBytesStoreSupplier: Add WINDOW_STORE_WITH_HEADERS to the WindowStoreTypes enum. The get() method instantiates RocksDBWindowStoreWithHeaders for this type.

  • BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.windowStore(): Add a case for PLAIN_HEADERS that returns Stores.persistentWindowStoreWithHeaders(...).

Join Process Changes

  • StreamJoinedStoreFactory: Change to use Stores.windowStoreWithHeadersBuilder().
  • KStreamImplJoin: Updated joinWindowStoreBuilderFromSupplier() to use Stores.windowStoreWithHeadersBuilder().
  • KStreamJoinWindow: Changed from storing ValueTimestampHeaders.make(value, timestamp, headers) to storing ValueAndHeaders.make(value, headers).
  • KStreamKStreamJoin and KStreamKStreamSelfJoin Changed store type from TimestampedWindowStoreWithHeaders<K, V> to WindowStoreWithHeaders<K, V>. Fetched values are ValueAndHeaders<V>.

Adapter Classes

Adapter

Source Store Type

Conversion

PlainToPlainHeadersWindowStoreAdapter

Plain WindowStore

Write: [headers][value][value] (strip headers).

Read: [value][0x00][value] (add empty headers).

TimestampedToPlainHeadersWindowStoreAdapter

TimestampedBytesStore

Write: [headers][value][timestamp=-1][value] (strip headers, add dummy timestamp).

Read: [timestamp][value][0x00][value] (strip timestamp, add empty headers).

TimestampedHeadersToPlainHeadersWindowStoreAdapter

TimestampedBytesStore and HeadersByteStore

Write: [headers][value][headers][timestamp=-1][value] (inject dummy timestamp).

Read: [headers][timestamp][value][headers][value] (strip timestamp).

Compatibility, Deprecation, and Migration Plan

Migration uses column-family-based migration via DualColumnFamilyAccessor, the standard mechanism used by all previous format migrations in Kafka Streams (KIP-258 for timestamped stores, KIP-1271 for header-aware stores). The same as in KIP-1271 and KIP-1285, downgrades are not supported; the only way to achieve one is by deleting the local store and rebuilding it from the changelog.

No deprecation in this KIP.

Test Plan

This feature can be mainly tested via unit and integration tests:

  • unit/integration tests for newly added stores.
  • Test both migration paths: [value][headers][value], [headers][timestamp][value][headers][value].

Additionally, existing system tests are extended to perform rolling bounce upgrades.

Rejected Alternatives

N/A

  • No labels