...
single stream | multiple streams | |||||
---|---|---|---|---|---|---|
data transformation | other | data transformation | ||||
tuple-by-tuple | aggregation | tuple-by-tuple (i.e., joins) | ||||
same key | new key | non-windowed | windowed (KTable -> KTable<<W,K> V> | non-windowed KStream-KTable | windowed KStream-KStream | |
filter (1:[0,1]) | aggregateByKey | aggregateByKey | join (as of 0.10.2) | join | ||
filterNot (1:[0,1]) | reduceByKey | reduceByKey |
| leftJoin | leftJoin | |
selectKey (1:1) | countByKey | countByKey | foreach | outerJoin | ||
mapValues (1:1) | map (1:1) | through | merge (as of 1.0) | |||
flatMapValues (1:n) | flatMap (1:n) | to | ||||
transformValues (1:1 + state) | transform (1:n + state) | branch | ||||
process (1:0 + state) | peek (as of 0.11.0) |
KTable API
KTable
currently offers the following methods which do have different implication with regard to (internally) created topics and RocksDB usage. Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-3576
single stream | multiple streams | ||||
---|---|---|---|---|---|
data transformation | other | data transformation | |||
tuple-by-tuple | aggregation (KGroupedTable -> KTable) | tuple-by-tuple (KTable-KTable joins)
| |||
same key (-> KTable) | new key (-> KGroupedTable) | ||||
filter (1:[0,1]) | aggregate |
| join | ||
filterNot (1:[0,1]) | reduce |
| leftJoin | ||
mapValues (1:1) | count |
| outerJoin | ||
groupBy (1:1) [internally simple map] |
| ||||
| |||||
toStream |
Data Management
...
RocksDB is just used as an internal lookup table (that is able to flush to disc disk if the state does not fit into memory
) and the internal changelog topic is created for fault-tolerance reasons. Thus, the changelog topic is the source of truth for the state (= the log of the state), while RocksDB is used as (non-fault tolerant) cache. RocksDB cannot be used for fault-tolerance because flushing happens to local discdisk, and it cannot be controlled when flushing happens. RocksDB flushing is only required because state could be larger than available main-memory. Thus, the internal changelog topic is used for fault-tolerance: If a task crashes and get restarted on different machine, this internal changelog topic is used to recover the state store. Currently, the default replication factor of internal topics is 1. Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-3776
There are two main differences between non-windowed and windowed aggregation with regard to key-design. For window aggregation the key is <K,W>, i.e., for each window a new key is used. Thus, the memory usage grows over time ( The second difference is about RocksDB instances: instead of using a single instance, Streams uses multiple instances (called “segments”) for different time periods. After the window retention time has passed old segments can be dropped. Thus, RocksDB memory requirement does not grow infinitely
), even if the key-space is bounded (i.e., the number of unique keys). This implies that log-compaction cannot purge any old data. Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-4015 (in contrast to changelog topic). (KAFKA-4015 was fixed in 0.10.1 release, and windowed changelog topics don't grow unbounded as they apply an additional retention time parameter).
For KTable a similar behavior applies. Using groupBy().someAgg() results in internal topic and RocksDB creation.
...
Kafka Streams commit the current processing progress in regular intervals (parameter commit.interval.ms). If a commit is triggered, all state stores need to flush data to discdisk, i.e., all internal topics needs to get flushed to Kafka. Furthermore, all user topics get flushed, too. Finally, all current topic offsets are committed to Kafka. In case of failure and restart, the application can resume processing from its last commit point (providing at-least-once processing guarantees).
...