...
- org.apache.kafka.streams.processor.StateStore
org.apache.kafka.streams.state.StoreSupplier
org.apache.kafka.streams.state.Stores
org.apache.kafka.streams.kstream.Materialized
org.apache.kafka.streams.kstream.StreamJoined
- org.apache.kafka.streams.StoreQueryParameters
- org.apache.kafka.streams.query.StateQueryRequest
Proposed Changes
Background
...
To sum up, StateStore#recover transitions the state store to the consistent, committed state and returns the changelog corresponding to that state. Given the Streams workloads properties described above and the AK transaction commit order, the returned offset is always either the offset in the checkpoint file or the offset committed to the changelog.
Transactions via Secondary State Store for Uncommitted Changes
New Stores#
factory methods described above provide an option to create transactional state stores. Transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished.
On crash failure, ProcessorStateManager
calls StateStore#recover(offset)
. The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.
The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer.
The disadvantages are:
- It doubles the number of open state stores
- It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.
Behavior changes
Behavior changes
If StateStore#
transactional()
returns true
, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore
methods return uncommitted data from the ongoing transaction.
A transactional state store opens the first transaction during initialization. It commits on StateStore#commit
- first, the store commits the transaction, then flushes, then starts a new transaction.
There are several places where StreamTask
, ProcessorStateManager
, and TaskManager
check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:
StreamTask
resumes processing (link)ProcessorStateManager
initializes state stores from offsets from checkpoint (link1, link2)StreamTask
writes offsets to the checkpoint file on after committing (link)TaskManager
handles revocation (link)
If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.
Transactions via Secondary State Store for Uncommitted Changes
New Stores#
factory methods described above provide an option to create transactional state stores. Transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished.
All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator
that merges the results.
Behavior changes
If StateStore#
transactional()
returns true
, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore
methods return uncommitted data from the ongoing transaction.
A transactional state store opens the first transaction during initialization. It commits on StateStore#flush
- first, the store commits the transaction, then flushes, then starts a new transaction.
There are several places where StreamTask
, ProcessorStateManager
, and TaskManager
check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:
StreamTask
resumes processing (link)ProcessorStateManager
initializes state stores from offsets from checkpoint (link1, link2)StreamTask
writes offsets to the checkpoint file on after committing (link)TaskManager
handles revocation (link)
If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.
Interactive Queries (IQ)
We introduce new readCommitted
parameters for IQv1 and IQv2 that control whether queries return committed or uncommitted results if the underlying state store is transactional. If the underlying store is not transactional, then IQs return the most recently written data regardless of the same readCommitted
value.
If readCommitted
is false
or the store is not transactional, interactive queries work exactly the same as we do now. If readCommitted
is true and
the state store is transactional, the query reads directly from the store ignoring records in RecordCache
and the current uncommitted transaction.
Interface Changes
StateStore
On crash failure, ProcessorStateManager
calls StateStore#recover(offset)
. The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.
The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer.
The disadvantages are:
- It doubles the number of open state stores
- It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.
Interface Changes
StateStore
Code Block | ||||
---|---|---|---|---|
| ||||
Code Block | ||||
| ||||
/** * Return true the storage supports transactions. * * @return {@code true} if the storage supports transactions, {@code false} otherwise */ default boolean transactional() { return false; } /** * Flush and commit any cached data * <p> * For transactional state store commit applies all changes atomically. In other words, either the * entire commit will be successful or none of the changes will be applied. * <p> * For non-transactional state store this method flushes cached data. * * @param changelogOffset the offset of the changelog topic this commit corresponds to. The * offset can be null if the state store does not have a changelog * (e.g. a global store). * @code null} */ void commit(final Long changelogOffset); /** * Recover a transactional state store * <p> * If a transactional state store shut down with a crash failure, this method can either * roll back or forward uncommitted changes. In any case, this method returns the changelog * offset it rolls to. * * @param changelogOffset the checkpointed changelog offset. * @return the changelog offset after recovery. */ Long recover(final Long changelogOffset) |
...
Code Block |
---|
public Materialized<K, V, S> withTransactionalityEnabled(); public Materialized<K, V, S> withTransactionalityDisabled(); public StoreSupplier<S> storeSupplier(); public boolean transactional(); |
StoreQueryParameters
Code Block | ||
---|---|---|
| ||
/**
* Enable reading only committed data
* @return StoreQueryParameters a new {@code StoreQueryParameters} instance configured with reading only committed data
*/
public StoreQueryParameters<T> enableReadCommitted() |
StateQueryRequest
Code Block |
---|
/**
* Specifies that this query should only read committed data
*/
public StateQueryRequest<R> enableReadCommitted() |
Compatibility, Deprecation, and Migration Plan
Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true
to Materialized
constructor and Stores
factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract StateStore#transactional()
contract.
Proposed changes are source compatible and binary incompatible with previous releases.
Test Plan
- Changes not committed to the changelog topic are discarded on crash failure.
- Changes committed to the changelog topic, but not committed to the state store are rolled forward.
Rejected Alternatives
RocksDB in-memory Indexed Batches
A considered alternative is to make built-in RocksDB state store transactional by using WriteBatchWithIndex, which is similar to WriteBatch
already used segment stores, except it also allows reading uncommitted data.
The advantage of this approach is that it uses the RocksDB built-in mechanism to ensure transactionality and offers the smallest possible write amplification overhead.
The disadvantage of this approach is that all uncommitted writes must fit into memory. In practice, RocksDB developers recommend the batches to be no larger than 3-4 megabytes (link) which might be an issue
RocksDB Optimistic Transactions
Another considered alternative is OptimisticTransactionDB. This alternative suffers from the same issues as in-memory indexed batches, but also has greater overhead. It offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead.
Transactions via Secondary State Store for Uncommitted Changes
In this alternative, Kafka Streams opens two stores instead of one - a temporary store and a regular store. All uncommitted writes go to the temporary store. Once the task flushes, the temporary store creates a commit file, an empty file that indicates that the corresponding file is ready to commit, and writes stored records to the regular store. It truncates the store and deletes the commit file once it is finished.
On crash failure, the task checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above. Otherwise, it truncates the temporary store.
The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer.
The disadvantages are:
...
Compatibility, Deprecation, and Migration Plan
Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true
to Materialized
constructor and Stores
factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract StateStore#transactional()
contract.
Proposed changes are source compatible and binary incompatible with previous releases.
Test Plan
- Changes not committed to the changelog topic are discarded on crash failure.
- Changes committed to the changelog topic, but not committed to the state store are rolled forward.
Rejected Alternatives
RocksDB in-memory Indexed Batches
A considered alternative is to make built-in RocksDB state store transactional by using WriteBatchWithIndex, which is similar to WriteBatch
already used segment stores, except it also allows reading uncommitted data.
The advantage of this approach is that it uses the RocksDB built-in mechanism to ensure transactionality and offers the smallest possible write amplification overhead.
The disadvantage of this approach is that all uncommitted writes must fit into memory. In practice, RocksDB developers recommend the batches to be no larger than 3-4 megabytes (link) which might be an issue
RocksDB Optimistic Transactions
Another considered alternative is OptimisticTransactionDB. This alternative suffers from the same issues as in-memory indexed batches, but also has greater overhead. It offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead.
...
Method to control transaction lifecycle in StateStore
...