Table of Contents
Table of Contents
Overview
Kafka Streams allows for stateful stream processing, i.e. operators that have an internal state. This internal state is managed in so-called state stores. A state store can be ephemeral (lost on failure) or fault-tolerant (restored after the failure). The default implementation used by Kafka Streams DSL is a fault-tolerant state store using 1. an internally created and compacted changelog topic (for fault-tolerance) and 2. one (or multiple) RocksDB instances (for cached key-value lookups). Thus, in case of starting/stopping applications and rewinding/reprocessing, this internal data needs to get managed correctly.
Current State
We first want to give an overview about the current implementation details of Kafka Streams with regard to (internally) created topics and the usage of RocksDB. We can categorize available transformations for KStream
and KTable
as shown below. All operators within a category use the same internal state management mechanism. Therefore, we get an overview of the state management strategy for each transformation.
...
State management details are given below.
KStream API
KStream
currently offers the following methods which do have different implication with regard to (internally) created topics and RocksDB usage.
single stream | multiple streams | |||||
---|---|---|---|---|---|---|
data transformation | other | data transformation | ||||
tuple-by-tuple | aggregation | tuple-by-tuple (i.e., joins) | ||||
same key | new key | non-windowed | windowed (KTable -> KTable<<W,K> V> | non-windowed KStream-KTable | windowed KStream-KStream | |
filter (1:[0,1]) | aggregateByKey | aggregateByKey | join | |||
filterNot (1:[0,1]) | reduceByKey | reduceByKey | writeAsText | leftJoin | leftJoin | |
selectKey (1:1) | countByKey | countByKey | foreach | outerJoin | ||
mapValues (1:1) | map (1:1) | through | ||||
flatMapValues (1:n) | flatMap (1:n) | to | ||||
transformValues (1:1 + state) | transform (1:1 + state) | branch | ||||
process (1:n + state) |
KTable API
KTable
currently offers the following methods which do have different implication with regard to (internally) created topics and RocksDB usage. Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-3576
single stream | multiple streams | ||||
---|---|---|---|---|---|
data transformation | other | data transformation | |||
tuple-by-tuple | aggregation (KGroupedTable -> KTable) | tuple-by-tuple (KTable-KTable joins)
| |||
same key (-> KTable) | new key (-> KGroupedTable) | ||||
filter (1:[0,1]) | aggregate | join | |||
filterNot (1:[0,1]) | reduce | writeAsText | leftJoin | ||
mapValues (1:1) | count | foreach | outerJoin | ||
groupBy (1:1) [internally simple map] | through | ||||
to | |||||
toStream |
Data Management
Overview
There are four methods to explicitly deal with user topics:
...
Related Work to state stores:
Jira | ||||||
---|---|---|---|---|---|---|
|
Commits
Kafka Streams commit the current processing progress in regular intervals (parameter commit.interval.ms). If a commit is triggered, all state stores need to flush data to disc, i.e., all internal topics needs to get flushed to Kafka. Furthermore, all user topics get flushed, too. Finally, all current topic offsets are committed to Kafka. In case of failure and restart, the application can resume processing from its last commit point (providing at-least-once processing guarantees).
Internal Topics and State Store Names
Currently in the Streams DSL we are trying to abstract the auto generated internal topics and state store names as "KTable names" and "window names"; however, in future release all state store name will be exposed to the user.
Internal topics follow the naming convention Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-3870 <application.id>-<operatorName>-<suffix>;
this naming convention might change any time in.
...