Child pages
  • Kafka Streams Join Semantics
Skip to end of metadata
Go to start of metadata

 

Kafka Streams offers a variety of different join operators with three different types:

  • sliding window KStream-KStream join
  • KStream-KTable join
  • KTable-KTable join

Furthermore, there are the different "variants" of joins, namely inner, left, and outer join (not each stream type offers every variant though). 

Join semantics are inspired by SQL join semantics, however, because Kafka Streams offers stream instead of batch processing, semantics do no align completely. In the following, we give a details explanation of the offered join semantics in Kafka Streams.

Below, we describe the semantics of each operator on two input streams/tables. We assume that all messages have the same key in these examples and thus omit the key to improve readability. For window joins, we assume that all records belong to a single window. Nevertheless, time (and processing order) is an important factor in stream-joins and thus we also show the timestamp for each record and assume that all records are processed in timestamp order. The format below will be ts:value for each record and null indicates a missing value.

  • STREAM_1:   1:null, 3:A, 5:B 7:null, 9:C, 12:null, 15:D
  • STREAM_2:   2:null, 4:a, 6:b, 8:null, 10:c, 11:null, 13:null, 14:d

Pay attention, that both streams are use as examples for KStream (ie, record stream) and KTable (ie, changelog stream) with different semantics. For KTable, so-called tombstone records with format key:null are of special interest, as they delete a key (those records are shown as null in all examples to highlight tombstone semantics). Last but not least, in Kafka Streams each join is "customized" by the user with a ValueJoiner function that compute the actual result. Hence, we show output records as "X - Y" with X and Y being the left and right value, respectively, given to the value joiner. If the output is shown as null (ie, tombstone message), ValueJoiner will not be called because a result record will be deleted.

New Join Semantics (v0.10.2.x and newer)

This section describes the new join semantics as of version 0.10.2.x. For old join semantics (version 0.10.0.x and 0.10.1.x see Old Join Semantics below)

(See KIP-77: Improve Kafka Streams Join Semantics)

Kafka Streams offers the follow join operators (operators in bold font were added in current trunk, compared to 0.10.1.x and older):

 inner joinleft joinouter join
KStream-KStreamyesyesyes
KStream-KTableyesyesno
KTable-KTableyesyesyes

KStream-KStream Join

This is a sliding window join, ie, all tuples that are "close" to each other with regard to time (ie, time difference up to window size) are joined. The result is a KStream. The table below shows the output (for each processed input record) for all three join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

innerJoin

leftJoin

outerJoin

1

null

    

2

 

null

   

3

A

  

A - null

A - null

4

 

a

A - a

A - a

A - a

5

B

 

B - a

B - a

B - a

6

 

b

A - b

B - b

A - b

B - b

A - b

B - b

7

null

 

 

 

 

8

 

null

 

  

9

C

 

C - a

C - b

C - a

C - b

C - a

C - b

10

 

c

A - c

B - c

C - c

A - c

B - c

C - c

A - c

B - c

C - c

11

 

null

 

  

12

null

 

 

 

 

13

 

null

 

  

14

 

d

A - d

B - d

C - d

A - d

B - d

C - d

A - d

B - d

C - d

15

D

 

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

KStream-KTable Join

This is an asymmetric non-window join. The basic semantics is a KTable lookup for each KStream record (while each KTable input record updates the current KTable view but does never yield any result record). The result is a KStream. Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort). The table below shows the output (for each processed input record) for both offered join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

leftJoininnerJoin

1

null

   

2

 

null

  

3

A

 A - null 

4

 

a

  

5

B

 B - aB - a

6

 

b

  

7

null

   

8

 

null

  

9

C

 C - null 

10

 

c

  

11

 

null

  

12

null

   

13

 

null

  

14

 

d

  

15

D

 D - dD - d

KTable-KTable Join

This is a symmetric non-window join. The basic semantics is a KTable lookup in the "other" stream for each KTable update. The result is a (continuously updating) KTable (ie, a changelog stream that can contain tombstone message with format <key:null>; those tombstone are shown as null in the result in contrast to results "X - null" indicating a valid join result with only one join partner). Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort).

KTable Cache

If you want to observe the below described behavior, you will most likely need to disable KTable deduplication cache, by setting cache.max.bytes.buffering=0 in StreamsConfig. Otherwise, the deduplication cache will "swallow" many of the produced result records and it will be hard to reason about the actual join behavior.

ts

left

right

innerJoin

leftJoin

outerJoin

1

null

 

 

 

 

2

 

null

 

 

 

3

A

 

 

A - null

A - null

4

 

a

A - a

A - a

A - a

5

B

 

B - a

B - a

B - a

6

 

b

B - b

B - b

B - b

7

null

 

null

null

null - b

8

 

null

 

 

null

9

C

 

 

C - null

C - null

10

 

c

C - c

C - c

C - c

11

 

null

null

C - null

C - null

12

null

 

 

null

null

13

 

null

 

 

 

14

 

d

 

 

null - d

15

D

 

D - d

D - d

D - d


16




17
dD - dD - dD - d

Old Join Semantics (v0.10.1.x and older)

Kafka Streams 0.10.1.x (and older) offers the follow join operators:

 inner joinleft joinouter join
KStream-KStreamyesyesyes
KStream-KTablenoyesno
KTable-KTableyesyesyes

KStream-KStream Join

This is a sliding window join, ie, all tuples that are "close" to each other with regard to time (ie, time difference up to window size) are joined. The result is a KStream. The table below shows the output (for each processed input record) for all three join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

innerJoin

leftJoin

outerJoin

1

null

  

null - null

null - null

2

 

null

  

null - null

3

A

  

A - null

A - null

4

 

a

A - a

 

A - a

5

B

 

B - a

B - a

B - a

6

 

b

A - b

B - b

 

A - b

B - b

7

null

 

null - a

null - b

null - a

null - b

null - a

null - b

8

 

null

A - null

B - null

 

A - null

B - null

9

C

 

C - a

C - b

C - a

C - b

C - a

C - b

10

 

c

A - c

B - c

C - c

 

A - c

B - c

C - c

11

 

null

A - null

B - null

C - null

 

A - null

B - null

C - null

12

null

 

null - a

null - b

null - c

null - a

null - b

null - c

null - a

null - b

null - c

13

 

null

A - null

B - null

C - null

 

A - null

B - null

C - null

14

 

d

A - d

B - d

C - d

 

A - d

B - d

C - d

15

D

 

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

KStream-KTable Join

This is an asymmetric non-window join. The basic semantics is a KTable lookup for each KStream record. The result is a KStream. Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort). The table below shows the output (for each processed input record) for both offered join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

leftJoin

1

null

  null - null

2

 

null

 

3

A

 A - null

4

 

a

 

5

B

 B - a

6

 

b

 

7

null

  null - b

8

 

null

 

9

C

 C - null

10

 

c

 

11

 

null

 

12

null

  null - null

13

 

null

 

14

 

d

 

15

D

 D - d

KTable-KTable Join

This is a symmetric non-window join. The basic semantics is a KTable lookup in the "other" stream for each KTable update. The result is a (continuously updating) KTable (ie, a changelog stream that can contain tombstone message with format <key:null>; those tombstone are shown as null in the result in contrast to results "X - null" indicating a valid join result with only one join partner). Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort).

KTable Cache

If you want to observe the below described behavior, you will most likely need to disable KTable deduplication cache (for Kafka 0.10.1.x), by setting cache.max.bytes.buffering=0 in StreamsConfig. Otherwise, the deduplication cache will "swallow" many of the produced result records and it will be hard to reason about the actual join behavior.

ts

STREAM_1 (left)

STREAM_2 (right)

innerJoin

leftJoin

outerJoin

1

null

 

null

null

null

2

 

null

null

null

null

3

A

 

null

A - null

A - null

4

 

a

A - a

A - a

A - a

5

B

 

B - a

B - a

B - a

6

 

b

B - b

B - b

B - b

7

null

 

null

null

null - b

8

 

null

null

null

null

9

C

 

null

C - null

C - null

10

 

c

C - c

C - c

C - c

11

 

null

null

C - null

C - null

12

null

 

null

null

null

13

 

null

null

null

null

14

 

d

null

null

null - d

15

D

 

D - d

D - d

D - d

16




17
dD - dD - dD - d
  • No labels