Status

Current state: Under Discussion

Discussion thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KIP-149, we introduced ValueJoinerWithKey interface for Kafka Streams DSL. Its intent is to give the joiner access to the join key, so users can write joiners whose output depends on the key, not just the values. The underlying KStreamKTableJoinProcessor is shared by both stream-table and stream-globalTable joins.

For stream-table joins, the join key is the stream record's key, so the processor works correctly. For stream-globalTable joins, the join key is computed from the stream record via a user-supplied KeyValueMapper and is generally different from the stream record's key. However, the current KStreamKTableJoinProcessor#doJoin implementation passes the stream record's key into ValueJoinerWithKey.apply(), not the mapped join key.

The discrepancy is not surfaced at compile time: the DSL signature pins the joiner's first type parameter to the stream key type K, and the implementation passes record.key()(also K), so types align, and no error or warning is raised. The join still matches records correctly because matching is done by the KeyValueMapper, not the joiner. However, KIP-149 specified readOnlyKey would be the join key, which for stream–globalTable joins is the mapped key, not the stream key. Any joiner that uses readOnlyKey will silently produce output values that encode the stream key where it is expected to be the join key, with no error or warning surfaced.

Public Interfaces

KStream

Four new methods are added to KStream. They mirror the four existing stream-globalTable join overloads that take a ValueJoinerWithKey.

/** As {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}, but the joiner's {@code readOnlyKey} is the mapped join key. */
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> joinByMappedKey(
    final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super GlobalKey, ? super V, ? super GlobalValue, ? extends VOut> joiner
)

 /** As above, with a {@link Named} processor name. */
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> joinByMappedKey(
    final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super GlobalKey, ? super V, ? super GlobalValue, ? extends VOut> joiner,
    final Named named
)

/** As {@link #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)}, but the joiner's {@code readOnlyKey} is the mapped join key. */
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoinByMappedKey(
    final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super GlobalKey, ? super V, ? super GlobalValue, ? extends VOut> joiner
)

/** As above, with a {@link Named} processor name. */
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoinByMappedKey(
    final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super GlobalKey, ? super V, ? super GlobalValue, ? extends VOut> joiner,
    final Named named
)


The existing KStream.join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) and KStream.leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) overloads are retained for backwards compatibility. They continue to pass the stream record's key into the joiner, but are marked as @Deprecated, are targeted to be removed in a future major release. Furthermore, an additional comment for each deprecated method stressing that the readOnlyKey is currently the stream key, not the mapped join key is added.

/**
 * <p><b>Warning:</b> {@code readOnlyKey} is the {@code KStream} record's key, <b>not</b> the join key
 * produced by {@code keySelector}. Unlike {@link #join(KTable, ValueJoinerWithKey) KStream-KTable} and
 * {@link #join(KStream, ValueJoinerWithKey, JoinWindows) KStream-KStream} joins — where the stream key
 * <em>is</em> the join key — {@link GlobalKTable} joins derive the join key via {@code keySelector}, so
 * {@code readOnlyKey} does <b>not</b> necessarily match the key of the {@link GlobalKTable} record being joined.
 */
@Deprecated
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(
	final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner
)

@Deprecated
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(
	final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
    final Named named
)

@Deprecated
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(
	final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner
)

@Deprecated
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(
	final GlobalKTable<GlobalKey, GlobalValue> globalTable,
    final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
    final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
    final Named named
)


Proposed Changes

This KIP proposes new entry points that pass the join key (the result of the KeyValueMapper) to ValueJoinerWithKey, and deprecates the original overloads that pass the stream record's key, so that readOnlyKey aligns with the join key as described in KIP-149.

When a record arrives on the stream:

  1. The runtime computes mappedKey = keySelector.apply(record.key(), record.value()).

  2. The global table is looked up at mappedKey to obtain the matched value (or null for left-join with no match).

  3. The joiner is invoked as joiner.apply(mappedKey, record.value(), tableValue).

  4. The output record's key remains the stream record's key. The only behavioral change is the value of readOnlyKey passed into ValueJoinerWithKey#apply: it will be the mapped join key instead of the stream record's key.

Compatibility, Deprecation, and Migration Plan

  • The existing join/leftJoin stream-globalTable overloads taking ValueJoinerWithKey are deprecated. Existing applications continue to compile and run unchanged, but the compiler will emit deprecation warnings, and the methods will be removed in a future major release.

  • The new joinByMappedKey and leftJoinByMappedKey methods are additive. Existing code compiles and runs unchanged; migration is required only before the deprecated overloads are removed.

  • Stream-globalTable overloads taking ValueJoiner (no key access) are unaffected.

  • Migration guidance for the new join methods will be added to the Streams upgrade guide.

Test Plan

Unit tests in KStreamGlobalKTableJoinTest, KStreamGlobalKTableLeftJoinTest and other related tests covering the new join methods.

Rejected Alternatives

Change KStreamKTableJoinProcessor#doJoin in place

Passing mappedKey instead of record.key() to the existing joiner is rejected by the compiler: the public DSL binds the joiner's first type parameter to the stream key type K, so a TableKey cannot be passed there.

Even if worked around with an unsafe cast, this would silently change the observable value of readOnlyKey for every existing application using ValueJoinerWithKey with a stream-globalTable join, with no migration path.

Add an overload of the same join name for join and leftJoin

The new overloads would differ from the existing ones only in the generic parameterization of ValueJoinerWithKey (the first type parameter would bind to the mapped key type instead of the stream key type). After type erasure, both have identical signatures, and Java rejects them as duplicate method declarations.

Add a new functional interface exposing both the stream key and the mapped key

Would require introducing a new ValueJoinerWithKeys interface accepting both keys. Rejected for this KIP, whose scope is limited to honoring KIP-149's existing readOnlyKey contract. It can be revisited as a separate KIP if demand emerges.

  • No labels