Table of Contents |
---|
Status
Current state: Under Discussion
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Right now, a stream processor with EOS has to delete all data from the local state stores on crash failure because the state stores might be in a partially updated state after crash failure. The partial update of a state store can happen during a crash failure because the changes to the local state are not atomic with respect to Kafka Streams commit. If an application with EOS crashes between commits, it cannot reset the state to the previously committed, so it wipes the state stores and replays the changelog from scratch.
This KIP proposes making writes to the state stores transactional , so that they atomically commit only after the corresponding changes are committed to the changelog topic. As a result, Streams applications configured with EOS will no longer need to wipe the state stores on crash failure.
Public Interfaces
Changed:
- org.apache.kafka.streams.processor.StateStore
org.apache.kafka.streams.state.StoreSupplier
org.apache.kafka.streams
..state.Stores
org.apache.kafka.streams.kstream.Materialized
StreamsConfig- org.apache.kafka.streams.StoreQueryParameters
- org.apache.kafka.streams.query.StateQueryRequest
Proposed Changes
Overview
This section introduces the an overview of the proposed changes. The following sections will cover the changes in behavior, configuration, and interfaces in detail.
This KIP proposes introducing an enabled by default feature flag (statestore.transactional.enabled
) that makes writes to state stores a new method, boolean transactional(),
to StateStore
and StoreSupplier
interfaces. If StateStore#transactiona
l() returns true
, then writes to that state store are transactional, meaning they apply atomically after the applications commits to the changelog topic. Users can create persistent transactional state stores via the Stores
factory.
Transactional state stores do not delete the checkpoint file and the underlying data in the case of EOS. Instead, they roll forward from the changelog topic on recovery The transactional guarantees come with non-zero overhead, so the feature flag acts as a safety switch if Kafka Streams users decide to opt-out. This feature presents a trade-off between increased read, write, and memory amplifications and time to recover from crash failure in the case of EOS.
When the feature flag is on, and the state store supports transactions (checked via new boolean method StateStore#transactional
), it applies changes atomically only after they were committed to the changelog topic. Transactional state stores do not delete the checkpoint file and the underlying data in the case of EOS. Instead, they roll forward from the changelog topic on recovery. When the feature flag is off or the store does not support transactions, EOS recovery behaves as it does now - the task will delete the checkpoint file and recover from the changelog.
This KIP does not introduce separate methods to begin and commit transactions. Instead, a transactional state store commits a currently open transaction on StateStore#flush
if the feature flag is true. The reasons for that behavior are the following:
- There is always a single writer in Kafka Streams workloads, and all writes must go to a single currently open transaction.
- There is always a single reader that queries dirty state from a single open transaction.
- The state stores already explicitly call flush before writing to the checkpoint file to make uncommitted changes durable. Adding separate methods will create room for error, such as a missing call to
beginTxn
orcommitTxn
, or flushing before committing.
...
This KIP does not introduce separate methods to begin and commit transactions. Instead, a transactional state store commits a currently open transaction on StateStore#flush
if the feature flag is true. The reasons for that behavior are the following:
- There is always a single writer in Kafka Streams workloads, and all writes must go to a single currently open transaction.
- There is always a single reader that queries dirty state from a single open transaction.
- The state stores already explicitly call flush before writing to the checkpoint file to make uncommitted changes durable. Adding separate methods will create room for error, such as a missing call to
beginTxn
orcommitTxn
, or flushing before committing.
There are multiple ways to implement state store transactions that present different trade-offs. This KIP includes a single implementation via built-in RocksDB indexed batches and adds a configuration enum RocksDBTransactionalMechanism
to add other implementations in the future in a backward-compatible way.
Interface Changes
StateStore
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
return false;
}
/**
* Flush any cached data. When {@link #transactional()} returns true, flush commits dirty writes.
*/
void flush(); |
The flush signature did not change, but the way it is expected to work when transactional() is true does, so I am including it here as well:
StoreSupplier
Code Block | ||
---|---|---|
| ||
/**
* Return true if a call to {@link StoreSupplier#get} returns a transactional state
* store.
*
* @return {@code true} if a call to {@link StoreSupplier#get} returns a transactional state
* store, {@code false} otherwise.
*/
boolean transactional(); |
Stores
Code Block | ||
---|---|---|
| ||
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link TimestampedKeyValueStore} you should use
* {@link #persistentTimestampedKeyValueStore(String, boolean)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param whether or not the state store returned by the store supplier should be transactional
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent key-value store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name, boolean transactional);
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a
* {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link KeyValueStore} you should use
* {@link #persistentKeyValueStore(String, boolean)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param transactional whether or not the state store returned by the store supplier should be transactional
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent key-(timestamp/value) store
*/
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name, boolean transactional);
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link TimestampedWindowStore} you should use
* {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @param transactional whether or not the state store returned by the store supplier should be transactional
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final boolean transactional) throws IllegalArgumentException
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a
* {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link WindowStore} you should use
* {@link #persistentWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @param transactional whether or not the state store returned by the store supplier should be transactional
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final boolean transactional) throws IllegalArgumentException
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @param transactional whether or not the state store returned by the store supplier should be transactional
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod,
final boolean transactional) |
Materialized
Code Block |
---|
public Materialized<K, V, S> withTransactionalityEnabled();
public Materialized<K, V, S> withTransactionalityDisabled();
public StoreSupplier<S> storeSupplier();
public boolean transactional(); |
StoreQueryParameters
Code Block | ||
---|---|---|
| ||
/**
* Enable reading only committed data
* @return StoreQueryParameters a new {@code StoreQueryParameters} instance configured with reading only committed data
*/
public StoreQueryParameters<T> enableReadCommitted() |
StateQueryRequest
Code Block |
---|
/**
* Specifies that this query should only read committed data
*/
public StateQueryRequest<R> enableReadCommitted() |
Behavior changes
If StateStore#
transactional()
returns true
, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore
methods return uncommitted data from the ongoing transaction
Interface Changes
StateStore
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Return if the storage supports transactions or not.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
return false;
}
/**
* Flush any cached data. When {@link #transactional()} returns true, flush commits dirty writes.
*/
void flush(); |
The flush signature did not change, but the way it is expected to work when transactional() is true does, so I am including it here as well:
Configuration changes
StreamsConfig:
statestore.transactional.enabled
(true
by default) - enables transactional behavior for the state stores. When false, the state stores behave as they do now.statestore.transactional.mechanism
(rockdb_indexbatch
by default) - specifies the means to implement transactional writes.rockdb_indexbatch
is the only valid configuration value for now.
StoreQueryParameters (IQv1):
readCommitted
(false
by default) - controls whether IQv1 reads uncommitted or committed data.
StateQueryRequest (IQv2):
readCommitted
(false
by default) - controls whether IQv2 reads uncommitted or committed data.
Behavior changes
If statestore.transactional.enabled
is true and the underlying state store implements transactions (i.e., StateStore#
transactional()
returns true
), then the store writes via the transactional mechanism specified by statestore.transactional.mechanism
. Reads via ReadOnlyKeyValueStore
methods return uncommitted data from the ongoing transaction.
If statestore.transactional.enabled
is true
, but the store does not support transactions, the corresponding stream task falls back to the non-transactional behavior.
A transactional state store opens the first transaction during initialization. It commits on StateStore#flush
- first, the store commits the transaction, then flushes, then starts a new transaction.
There are several places where StreamTask
, ProcessorStateManager
, and TaskManager
check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:
StreamTask
resumes processing (link)ProcessorStateManager
initializes state stores from offsets from checkpoint (link1, link2)StreamTask
writes offsets to the checkpoint file on after committing (link)TaskManager
handles revocation (link)
The if-EOS
condition should only apply in all these cases if the feature flag is off or state stores are not transactional.
...
If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.
Interactive Queries (IQ)
We introduce new readCommitted
parameters for IQv1 and IQv2 that control whether queries return committed or uncommitted results if the underlying state store is transactional. If the underlying store is not transactional, then IQs return the most recently written data regardless of the same readCommitted
value.
If readCommitted
is false
or the store is not transactional, interactive queries work exactly the same as we do now. If readCommitted
is true and
the state store is transactional, then the query reads directly from the store ignoring records in RecordCache
and the current uncommitted transaction.
RocksDB
When statestore.transactional.mechanism=rockdb_indexbatch
When RocksDBTransactionalMechanism
=IndexBatches
, Kafka Streams will make the writes to the built-in RocksDB state store transactional by using WriteBatchWithIndex, which is similar to WriteBatch
already used segment stores, except it also allows reading uncommitted data.
...
A considered alternative is OptimisticTransactionDB, which offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead.
Compatibility, Deprecation, and Migration Plan
Applications using Streams DSL and a built-in RocksDB state store implementation will get transactional state stores enabled by default. They can opt out by disabling the feature flag.
concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead.
Compatibility, Deprecation, and Migration Plan
Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in RocksDB by passing a new boolean flag transactional=true
to Materialized
constructor and Stores
factory methodsApplications using Processor API will work as they do now. This will happen because the default implementation of StateStore#transactional
returns false
, so the relevant code in StreamTask
, ProcessorStateManager
, etc. will fall back to the non-transactional behavior.
Proposed changes are source compatible and binary incompatible with previous releases.
Test Plan
- Changes not committed to the changelog topic are discarded on crash failure.
- Changes committed to the changelog topic, but not committed to the state store are rolled forward.
Rejected Alternatives
Transactions via Secondary State Store for Uncommitted Changes
In this alternative, Kafka Streams opens two stores instead of one - a temporary store and a regular store. All uncommitted writes go to the temporary store. Once the task flushes, the temporary store creates a commit file, an empty file that indicates that the corresponding file is ready to commit, and writes stored records to the regular store. It truncates the store and deletes the commit file once it is finished.
...
For completeness, below are the details on the changes required to support this implementation.
Interface changes
StoreSupplier
:
- Add
Tget(final String suffix)
that returns a state store whose name ends with a suffix. We need this method to create temporary state stores.
...
- A constructor accepts a DB and a store supplier for the tmp store.
- On init, the state store configures depending on the state of the feature flag. If it is set to
false
, thenTransactionalKeyValueByteStore
just forwards all methods toinner
. Otherwise, it creates a temporary store and implements behavior changes described below.
Behavior changes
All writes and deletes go to the temporary store. Reads query the temporary store and if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator
that merges the results.
Similar to CachingKeyValueStore
and ChangeLoggingKeyValueBytesStore
, classes extending AbstractStoreBuilder
will wrap created state stores withTransactionalKeyValueByteStore
.
Method to control transaction lifecycle in StateStore
A considered alternative is to introduce methods like StateStore#
beginTxn
and StateStore#
commitTxn
to manage transactions lifecycle. I don’t think they are necessary due to stream workloads specifics - there is always a single transaction for a given task and that transaction commits only after the commit to the changelog. Moreover, explicit method calls to begin and commit a transaction introduce possible invalid states, like skipping beginTxn before committing, beginning a transaction multiple times, committing after flushing, etc.
...