Status

Current stateAccepted

Discussion thread: here 

JIRA: KAFKA-17893 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, KTable foreign key joins only allow extracting the foreign key from the value of the source record. This forces users to duplicate data that might already exist in the key into the value when the foreign key needs to be derived from both the key and value. This leads to:

  1. Data duplication
  2. Additional storage overhead
  3. Potential data inconsistency if the duplicated data gets out of sync
  4. Less intuitive API when the foreign key is naturally derived from both key and value

Public Interfaces


Java API Changes

Raise join  as an example here, other DSL functions like leftjoin and their overloding functions are also apply.

File: streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 

public interface KTable<K, V> {
    // Existing method
    <VR, KO, VO> KTable<K, VR> join(
        KTable<KO, VO> other,
        Function<V, KO> foreignKeyExtractor,  // OLD
        ValueJoiner<V, VO, VR> joiner);

    // New method
    <VR, KO, VO> KTable<K, VR> join(
        KTable<KO, VO> other,
        BiFunction<K, V, KO> foreignKeyExtractor,  // NEW
        ValueJoiner<V, VO, VR> joiner);
    ...
}

Scala API Changes

Raise join  as an example here, other DSL functions like leftjoin and their overloding functions are also apply.

File: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala 

class KTable[K, V](val inner: KTableJ[K, V]) {
    // Existing methods
    def join[VR, KO, VO](
        other: KTable[KO, VO],
        keyExtractor: Function[V, KO],
        joiner: ValueJoiner[V, VO, VR],
        materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
    ): KTable[K, VR]  

    // New methods
    def join[VR, KO, VO](
    	other: KTable[KO, VO],
    	keyExtractor: (K, V) => KO,
    	joiner: ValueJoiner[V, VO, VR],
    	materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
    ): KTable[K, VR]

    ...
}

Compatibility, Deprecation, and Migration Plan

Impact on Existing Users

 ✅ Maintained

Deprecation Plan

  1. Java API:

    // Existing method - mark as deprecated in 4.0
    @Deprecated(since = "4.0", forRemoval = true)
    <VR, KO, VO> KTable<K, VR> join(
        KTable<KO, VO> other,
        Function<V, KO> foreignKeyExtractor,
        ValueJoiner<V, VO, VR> joiner);
  2. Scala API

    // Existing method - mark as deprecated in 4.0
    @deprecated("Use join with (K,V) => KO extractor instead", "4.0")
    def join[VR, KO, VO](
        other: KTable[KO, VO],
        keyExtractor: V => KO,
        joiner: ValueJoiner[V, VO, VR]): KTable[K, VR]

Test Plan

  1. Unit tests:
    - Test foreign key extraction using only value (backward compatibility)
    - Test foreign key extraction using both key and value
    - Test null handling for both key and value
    - Test with various key and value types
  2. Integration tests:

     - End-to-end tests with actual topics and data
     - Performance comparison tests
     - Error handling and recovery scenarios

Rejected Alternatives


  • No labels