Status
Current state: Accepted
Discussion thread: here
JIRA: KAFKA-17893
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, KTable foreign key joins only allow extracting the foreign key from the value of the source record. This forces users to duplicate data that might already exist in the key into the value when the foreign key needs to be derived from both the key and value. This leads to:
- Data duplication
- Additional storage overhead
- Potential data inconsistency if the duplicated data gets out of sync
- Less intuitive API when the foreign key is naturally derived from both key and value
Public Interfaces
Java API Changes
Raise join
as an example here, other DSL functions like leftjoin and their overloding functions are also apply.
File: streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
public interface KTable<K, V> { // Existing method <VR, KO, VO> KTable<K, VR> join( KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, // OLD ValueJoiner<V, VO, VR> joiner); // New method <VR, KO, VO> KTable<K, VR> join( KTable<KO, VO> other, BiFunction<K, V, KO> foreignKeyExtractor, // NEW ValueJoiner<V, VO, VR> joiner); ... }
Scala API Changes
Raise join as an example here, other DSL functions like leftjoin and their overloding functions are also apply.
File: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
class KTable[K, V](val inner: KTableJ[K, V]) { // Existing methods def join[VR, KO, VO]( other: KTable[KO, VO], keyExtractor: Function[V, KO], joiner: ValueJoiner[V, VO, VR], materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] ): KTable[K, VR] // New methods def join[VR, KO, VO]( other: KTable[KO, VO], keyExtractor: (K, V) => KO, joiner: ValueJoiner[V, VO, VR], materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] ): KTable[K, VR] ... }
Compatibility, Deprecation, and Migration Plan
Impact on Existing Users
✅ Maintained
Deprecation Plan
Java API:
// Existing method - mark as deprecated in 4.0 @Deprecated(since = "4.0", forRemoval = true) <VR, KO, VO> KTable<K, VR> join( KTable<KO, VO> other, Function<V, KO> foreignKeyExtractor, ValueJoiner<V, VO, VR> joiner);
Scala API
// Existing method - mark as deprecated in 4.0 @deprecated("Use join with (K,V) => KO extractor instead", "4.0") def join[VR, KO, VO]( other: KTable[KO, VO], keyExtractor: V => KO, joiner: ValueJoiner[V, VO, VR]): KTable[K, VR]
Test Plan
- Unit tests:
- Test foreign key extraction using only value (backward compatibility)
- Test foreign key extraction using both key and value
- Test null handling for both key and value
- Test with various key and value types - Integration tests:
- End-to-end tests with actual topics and data
- Performance comparison tests
- Error handling and recovery scenarios
Rejected Alternatives