...
- The task (StreamTask, StandbyTask) registers its state stores. State stores load offset metadata from the checkpoint file (link). That step aims to establish a mapping between data in the state store and the offset of the changelog topic.
- In case of crash failure, if the state store has data, but the checkpoint file does not exist. ProcessorStateManager throws an exception in that case for EOS tasks. This is an indicator to throw away local data and replay the changelog topic (link).
- The task processes data and writes its state locally.
- The task commits EOS transaction. TaskExecutor#commitOffsetsOrTransaction calls StreamsProducer#commitTransaction that sends new offsets and commits the transaction.
- The task runs a postCommit method (StreamTask, StandbyTask) that:
- The task shuts down. It stops processing data, then writes its current offset to the checkpoint file and halts.
In step 4, the task does not flush and checkpoint in the case of EOS to prevent partial update on the crash failure. Without the risk of partial update, EOS could follow the same process as other processing semantics and avoid replaying the changelog.If the failure happens at steps 2 or 3, the state store might contain records that have not yet been committed by EOS transaction. These uncommitted records violate the EOS guarantees and are the reason why Kafka Streams deletes state store data if EOS is enabled.
Overview
This section introduces an overview of the proposed changes. The following sections will cover the changes in behavior, configuration, and interfaces in detail.
This KIP introduces persistent transactional state stores that
- distinguish between uncommitted and committed data in the state store
- guarantee atomic commit, meaning
...
- that either all uncommitted (dirty) writes will be applied together or none will.
These guarantees are sufficient to prevent the failure scenario described in the previous section.
This proposal deprecates the StateStore#flush
method and introduces 2 other methods instead - StateStore#commit(changelogOffset)
and StateStore#recover(changelogOffset)
that The proposal changes public APIs - StateStore
and StoreSupplier
interfaces and Stores
factory. StateStore
and StoreSupplier
have a new transactional() method
that returns true
if the state store is transactional. In addition, StateStore#flush
method is replaced by StateStore#commit(Long)
paired with StateStore#recover(long)
, allowing to commit the current state at a specified offset and recovering recover from the crash failure to a previously committed checkpointed offset . Users can create persistent accordingly. With these changes, the lifecycle of a stateful task with transactional state stores via the Stores
factory.
Transactional state stores do not delete the checkpoint file and the underlying data in the case of EOS. Instead, they roll back or forward from the changelog topic on recovery.
There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single implementation via a secondary RocksDB for uncommitted writes.
StateStore changes
This section covers multiple changes to the state store interfaces. This proposal replaces StateStore#flush
with 2 new methods - StateStore#commit(Long)
and StateStore#recover(long)
and adds a boolean transactional()
method to determine if a state store is transactional.
The reasoning behind replacing flush
with commit/recover
is two-fold. First, let's talk about why we don't need both flush
and commit:
- There is always a single writer in Kafka Streams workloads, and all writes must go to a single currently open transaction.
- There is always a single reader that queries dirty state from a single open transaction.
- The state stores already explicitly call flush after AK transaction commits before writing to the checkpoint file to make uncommitted changes durable. Adding a separate method will create room for error, such as a missing
commit
call or executing both commands in the wrong order.
In this sense, flush
and commit
are semantically the same thing. The separation of concerns between these 2 methods will be ambiguous and it is unclear what is the correct call order.
The purpose of StateStore#recover(long changelogOffset)
method is to transition the state store to a consistent state after crash failure. This method discards any changes that are not yet committed to the changelog topic and ensures that its state corresponds to the offset that is greater than or equal to the checkpointed changelogOffset
.
Behavior changes
If StateStore#
transactional()
returns true
, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore
methods return uncommitted data from the ongoing transaction.
A transactional state store opens the first transaction during initialization. It commits on StateStore#commit
- first, the store commits the transaction, then flushes, then starts a new transaction.
There are several places where StreamTask
, ProcessorStateManager
, and TaskManager
check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:
StreamTask
resumes processing (link)ProcessorStateManager
initializes state stores from offsets from checkpoint (link1, link2)StreamTask
writes offsets to the checkpoint file on after committing (link)TaskManager
handles revocation (link)
If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.
Transactions via Secondary State Store for Uncommitted Changes
This proposal comes with a reference implementation used in the Stores#
factory methods used to create transactional state stores. In this implementation, transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished.
All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator
that merges the results.
On crash failure, ProcessorStateManager
calls StateStore#recover(offset)
. The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.
The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer.
The disadvantages are:
...
becomes:
- The task registers its state stores. The state stores call
StateStore#recover
that discards uncommitted data. - The task processes data and writes new records as uncommitted.
- The task commits the EOS transaction.
- The task runs a postCommit method that:
- commits dirty writes.
- updates the checkpoint file.
- The task shuts down.
Consider possible failure scenarios:
- The crash happens between steps 1 and 3. The uncommitted data will be discarded. The input records were not committed via the EOS transaction, so the task will re-process them.
- The crash happens between 3 and 4a. The EOS transaction has been already committed, but the state store hasn't. The state store will replay the uncommitted records from the changelog topic.
- The crash happens between 4a and 4b. The state store has already committed the new records, but they are not yet reflected in the changelog topic. The state store will replay the last committed records from the changelog topic. This operation is idempotent and does not violate correctness.
- The crash happens after step 4b. She state store does nothing during recovery.
There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single reference implementation via a secondary RocksDB for uncommitted writes.
StateStore changes
This section covers multiple changes to the state store interfaces. This proposal replaces StateStore#flush
with 2 new methods - StateStore#commit(Long)
and StateStore#recover(long)
and adds a boolean transactional()
method to determine if a state store is transactional.
The reasoning behind replacing flush
with commit/recover
is two-fold. First, let's talk about why we don't need both flush
and commit:
- There is always a single writer in Kafka Streams workloads, and all writes must go to a single currently open transaction.
- There is always a single reader that queries dirty state from a single open transaction.
- The state stores already explicitly call flush after AK transaction commits before writing to the checkpoint file to make uncommitted changes durable. Adding a separate method will create room for error, such as a missing
commit
call or executing both commands in the wrong order.
In this sense, flush
and commit
are semantically the same thing. The separation of concerns between these 2 methods will be ambiguous and it is unclear what is the correct call order.
The purpose of StateStore#recover(long changelogOffset)
method is to transition the state store to a consistent state after crash failure. This method discards any changes that are not yet committed to the changelog topic and ensures that its state corresponds to the offset that is greater than or equal to the checkpointed changelogOffset
.
Behavior changes
If StateStore#
transactional()
returns true
, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore
methods return uncommitted data from the ongoing transaction.
A transactional state store opens the first transaction during initialization. It commits on StateStore#commit
- first, the store commits the transaction, then flushes, then starts a new transaction.
There are several places where StreamTask
, ProcessorStateManager
, and TaskManager
check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:
StreamTask
resumes processing (link)ProcessorStateManager
initializes state stores from offsets from checkpoint (link1, link2)StreamTask
writes offsets to the checkpoint file on after committing (link)TaskManager
handles revocation (link)
If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file
...
.
Interface Changes
StateStore
...
Code Block |
---|
public StoreSupplier<S> storeSupplier();
public boolean transactional(); |
Compatibility, Deprecation, and Migration Plan
Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true
to Materialized
constructor and Stores
factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the StateStore#transactional()
contract.
StateStore#flush()
method is deprecated. New StateStore#commit(changelogOffset)
method will by default fall back to StateStore#flush()
for non-transactional state stores.
Proposed changes are source compatible and binary incompatible with previous releases.
Test Plan
...
transactional(); |
Compatibility, Deprecation, and Migration Plan
Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true
to Materialized
constructor and Stores
factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the StateStore#transactional()
contract.
StateStore#flush()
method is deprecated. New StateStore#commit(changelogOffset)
method will by default fall back to StateStore#flush()
for non-transactional state stores.
Proposed changes are source compatible and binary incompatible with previous releases.
Test Plan
- Changes not committed to the changelog topic are discarded on crash failure.
- Changes committed to the changelog topic, but not committed to the state store are rolled forward.
Transactions via Secondary State Store for Uncommitted Changes
This proposal comes with a reference implementation used in the Stores#
factory methods used to create transactional state stores. In this implementation, transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished.
All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator
that merges the results.
On crash failure, ProcessorStateManager
calls StateStore#recover(offset)
. The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.
The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer.
The disadvantages are:
- It doubles the number of open state stores
- It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store
...
- .
Rejected Alternatives
RocksDB in-memory Indexed Batches
...