Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

single stream

multiple streams 

data transformation

other

data transformation

tuple-by-tuple
(KStream -> KStream)

aggregation

tuple-by-tuple (i.e., joins) 

same key

new key

non-windowed
(KStream -> KTable)

windowed

(KTable -> KTable<<W,K> V>

non-windowed

KStream-KTable

windowed

KStream-KStream 

filter

(1:[0,1])

 

aggregateByKey

aggregateByKey

print

join (as of 0.10.2)join

filterNot

(1:[0,1])

 

reduceByKey

reduceByKey

writeAsText (deprecated as of 1.0)

leftJoinleftJoin
 

selectKey

(1:1)

countByKey

countByKey

foreach

 outerJoin

mapValues

(1:1)

map

(1:1)

  

through

 merge (as of 1.0) 

flatMapValues

(1:n)

flatMap

(1:n)

  

to

  

transformValues

(1:1 + state)

transform

(1:n + state)

  

branch

  
 

process

(1:0 + state)

   peek (as of 0.11.0)  

 

KTable API

KTable currently offers the following methods which do have different implication with regard to (internally) created topics and RocksDB usage. 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3576

 

single stream

multiple streams

data transformation

other

data transformation

tuple-by-tuple
(KTable -> KTable/KGroupedTable)

aggregation

(KGroupedTable -> KTable)

tuple-by-tuple

(KTable-KTable joins)
 

 

same key

(-> KTable)

new key

(-> KGroupedTable)

filter

(1:[0,1])

 

aggregate

print (deprecated as of 1.0)

join

filterNot

(1:[0,1])

 

reduce

writeAsText (deprecated as of 1.0)

leftJoin

mapValues

(1:1)

 

count

foreach (deprecated as of 1.0)

outerJoin

 

groupBy

(1:1) [internally simple map]

 

through (deprecated as of 1.0)

 
   

to (deprecated as of 1.0)

 
   

toStream

 

 

 

Data Management

...

RocksDB is just used as an internal lookup table (that is able to flush to disc disk if the state does not fit into memory

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3776
) and the internal changelog topic is created for fault-tolerance reasons. Thus, the changelog topic is the source of truth for the state (= the log of the state), while RocksDB is used as (non-fault tolerant) cache. RocksDB cannot be used for fault-tolerance because flushing happens to local discdisk, and it cannot be controlled when flushing happens. RocksDB flushing is only required because state could be larger than available main-memory. Thus, the internal changelog topic is used for fault-tolerance: If a task crashes and get restarted on different machine, this internal changelog topic is used to recover the state store. Currently, the default replication factor of internal topics is 1.

There are two main differences between non-windowed and windowed aggregation with regard to key-design. For window aggregation the key is <K,W>, i.e., for each window a new key is used. Thus, the memory usage grows over time (

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4015
), even if the key-space is bounded (i.e., the number of unique keys). This implies that log-compaction cannot purge any old data. The second difference is about RocksDB instances: instead of using a single instance, Streams uses multiple instances (called “segments”) for different time periods. After the window retention time has passed old segments can be dropped. Thus, RocksDB memory requirement does not grow infinitely (in contrast to changelog topic). (KAFKA-4015 was fixed in 0.10.1 release, and windowed changelog topics don't grow unbounded as they apply an additional retention time parameter).

For KTable a similar behavior applies. Using groupBy().someAgg() results in internal topic and RocksDB creation.

...

Kafka Streams commit the current processing progress in regular intervals (parameter commit.interval.ms). If a commit is triggered, all state stores need to flush data to discdisk, i.e., all internal topics needs to get flushed to Kafka. Furthermore, all user topics get flushed, too. Finally, all current topic offsets are committed to Kafka. In case of failure and restart, the application can resume processing from its last commit point (providing at-least-once processing guarantees).

...