Table of Contents
Kafka Streams allows for stateful stream processing, i.e. operators that have an internal state. This internal state is managed in so-called state stores. A state store can be ephemeral (lost on failure) or fault-tolerant (restored after the failure). The default implementation used by Kafka Streams DSL is a fault-tolerant state store using 1. an internally created and compacted changelog topic (for fault-tolerance) and 2. one (or multiple) RocksDB instances (for cached key-value lookups). Thus, in case of starting/stopping applications and rewinding/reprocessing, this internal data needs to get managed correctly.
We first want to give an overview about the current implementation details of Kafka Streams with regard to (internally) created topics and the usage of RocksDB. We can categorize available transformations for
KTable as shown below. All operators within a category use the same internal state management mechanism. Therefore, we get an overview of the state management strategy for each transformation.
- single stream tuple-by-tuple: those transformations do not use an internal state — however, if they change the key ("new key") data re-partitioning might be required after the transformation (i.e., writing to and reading from a topic - KAFKA-3561Getting issue details... STATUS - KAFKA-3576Getting issue details... STATUS )
- aggregation and joins: those transformations do use internal state (RocksDB plus changelog topic)
- operations marked with "+ state" allows the usage of user defined state
State management details are given below.
KStream currently offers the following methods which do have different implication with regard to (internally) created topics and RocksDB usage.
tuple-by-tuple (i.e., joins)
(KTable -> KTable<<W,K> V>
|join (as of 0.10.2)||join|
|merge (as of 1.0)|
(1:1 + state)
(1:n + state)
(1:0 + state)
|peek (as of 0.11.0)|
(KGroupedTable -> KTable)
(1:1) [internally simple map]
There are four methods to explicitly deal with user topics:
KStream/KTable#through()for writing and reading again.
User topics are required to be created by the user before Kafka Streams application is started. Furthermore, an interal topic is created each time re-partitioning is required (via an ingested
KAFKA-3561Getting issue details...
); this happens when the key is changed before an aggregation is performed (not necessarily directly after each other):
Here's an illustration of the above pseudo-code topology:
This implies that after “new key” there was no
through() performed. The aggregation itself uses a RocksDB instance as key-value state store that also persists to local disk. Flushing to disk happens asynchronously. Furthermore, an internal compacted changelog topic is created. The state store sends changes to the changelog topic in a batch, either when a default batch size has been reached or when the commit interval (see "Commits" below) is reached.
RocksDB is just used as an internal lookup table (that is able to flush to disk if the state does not fit into memory
KAFKA-3776Getting issue details...
) and the internal changelog topic is created for fault-tolerance reasons. Thus, the changelog topic is the source of truth for the state (= the log of the state), while RocksDB is used as (non-fault tolerant) cache. RocksDB cannot be used for fault-tolerance because flushing happens to local disk, and it cannot be controlled when flushing happens. RocksDB flushing is only required because state could be larger than available main-memory. Thus, the internal changelog topic is used for fault-tolerance: If a task crashes and get restarted on different machine, this internal changelog topic is used to recover the state store. Currently, the default replication factor of internal topics is 1.
There are two main differences between non-windowed and windowed aggregation with regard to key-design. For window aggregation the key is <K,W>, i.e., for each window a new key is used.
Thus, the memory usage grows over time (
KAFKA-4015Getting issue details...
), even if the key-space is bounded (i.e., the number of unique keys). This implies that log-compaction cannot purge any old data. The second difference is about RocksDB instances: instead of using a single instance, Streams uses multiple instances (called “segments”) for different time periods. After the window retention time has passed old segments can be dropped. Thus, RocksDB memory requirement does not grow infinitely (in contrast to changelog topic). (KAFKA-4015 was fixed in 0.10.1 release, and windowed changelog topics don't grow unbounded as they apply an additional retention time parameter).
For KTable a similar behavior applies. Using groupBy().someAgg() results in internal topic and RocksDB creation.
For stateful KStream transformation (transform, transformValue, and process) an explicit state store is used. Depending on the use state store, a changelog topic might get created.
For joins, one or two internal state stores (RocksDB plus internal changelog topic) are used. Behavior is same as for aggregates. Joins can also be windowed (see window aggregates).
Kafka Streams commit the current processing progress in regular intervals (parameter commit.interval.ms). If a commit is triggered, all state stores need to flush data to disk, i.e., all internal topics needs to get flushed to Kafka. Furthermore, all user topics get flushed, too. Finally, all current topic offsets are committed to Kafka. In case of failure and restart, the application can resume processing from its last commit point (providing at-least-once processing guarantees).
Internal Topics and State Store Names
Currently in the Streams DSL we are trying to abstract the auto generated internal topics and state store names as "KTable names" and "window names"; however, in future release all state store name will be exposed to the user.
KAFKA-3870Getting issue details...
Internal topics follow the naming convention
<application.id>-<operatorName>-<suffix>; this naming convention might change any time in.