Table of Contents |
---|
Status
Current state: Under Discussion Discarded (in favor of KIP-892)
Discussion thread: here
JIRA: here
...
- org.apache.kafka.streams.processor.StateStore
org.apache.kafka.streams.state.StoreSupplierorg.apache.kafka.streams.state.Stores
org.apache.kafka.streams.kstream.Materializedorg.apache.kafka.streams.kstream.StreamJoined.StoreType
Proposed Changes
Background
...
- The task (StreamTask, StandbyTask) registers its state stores. State stores load offset metadata from the checkpoint file (link). That step aims to establish a mapping between data in the state store and the offset of the changelog topic.
- In case of crash failure, if the state store has data, but the checkpoint file does not exist. , ProcessorStateManager throws an exception in that case for EOS tasks. This is an indicator to throw away local data and replay the changelog topic (link).
- The task processes data and writes its state locally.
- The task commits EOS transaction. TaskExecutor#commitOffsetsOrTransaction calls StreamsProducer#commitTransaction that sends new offsets and commits the transaction.
- The task runs a postCommit method (StreamTask, StandbyTask) that:
- The Go to step 2 until task shuts down. It During shutdown, the task stops processing data, then writes its current offset to the checkpoint file and halts.
...
- The crash happens between steps 1 and 3. The uncommitted data will be discarded. The input records were not committed via the EOS transaction, so the task will re-process them.
- The crash happens between 3 and 4a. The EOS transaction has been already committed, but the state store hasn't. The state store will replay the uncommitted records from the changelog topic.
- The crash happens between 4a and 4b. The state store has already committed the new records, but they are not yet reflected in the changelog topiccheckpoint file. The state store will replay the last committed records from the changelog topic. This operation is idempotent and does not violate correctness.
- The crash happens after step 4b. She The state store does nothing during recovery.
There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single reference implementation via a secondary RocksDB for uncommitted writes.
StateStore changes
This section covers multiple changes to the state store interfaces. This proposal replaces StateStore#flush
with 2 new methods - StateStore#commit(Long)
and StateStore#recover(long)
and adds a boolean transactional()
method to determine if a state store is transactional.
...
If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.
...
Configuration changes
StateStore
StreamsConfig
default.dsl.store
has a new valid value - txn_rocksDB
that enables transactional RocksDB state store.
Interface Changes
StateStore
Code Block | ||||
---|---|---|---|---|
| ||||
Code Block | ||||
| ||||
/** * Return true the storage supports transactions. * * @return {@code true} if the storage supports transactions, {@code false} otherwise */ @Evolving default boolean transactional() { return false; } /** * Flush any cached data * */ @Deprecated default void flush() {} /** * Flush and commit any cached data * <p> * For transactional state store commit applies all changes atomically. In other words, either the * entire commit will be successful or none of the changes will be applied. * <p> * For non-transactional state store this method flushes cached data. * * @param changelogOffset the offset of the changelog topic this commit corresponds to. The * offset can be null if the state store does not have a changelog * (e.g. a global store). * @code null} */ @Evolving default void commit(final Long changelogOffset) { if (transactional()) { throw new UnsupportedOperationException("Transactional state store must implement StateStore#commit"); } else { flush(); } } /** * Recovers the state store after crash failure. * <p> * The state store recovers by discarding any writes that are not committed to the changelog * and rolling to the state that corresponds to {@code changelogOffset} or greater offset of * the changelog topic. * * @param changelogOffset the checkpointed changelog offset. * @return {@code true} if the state store recovered, {@code false} otherwise. */ @Evolving default boolean recover(final long changelogOffset) { if (transactional()) { throw new UnsupportedOperationException("Transactional state store must implement StateStore#recover"); } return false; } |
StoreSupplierStores
Code Block | ||
---|---|---|
| ||
/** * Return trueCreate if a call topersistent {@link StoreSupplier#get} returns a transactional stateKeyValueBytesStoreSupplier}. * store.<p> * *This @returnstore {@codesupplier true}can ifbe a call to {@link StoreSupplier#get} returns a transactional state * store, {@code false} otherwise. */ boolean transactional(); |
Stores
Code Block | ||
---|---|---|
| ||
/** passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link TimestampedKeyValueStore} you should use * {@link #persistentTimestampedKeyValueStore(String)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param transactional whether the store should be transactional * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-value store */ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name, final boolean transactional) /** * Create a persistent {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link KeyValueStore} you should use * {@link #persistentKeyValueStore(String)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param transactional whether the store should be transactional * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-(timestamp/value) store */ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name, final boolean transactional) /** * Create a persistent transactional {@link KeyValueBytesStoreSupplierWindowBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #keyValueStoreBuilder#windowStoreBuilder(KeyValueBytesStoreSupplierWindowBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link TimestampedKeyValueStoreTimestampedWindowStore} you should use * {@link #persistentTransactionalTimestampedKeyValueStore#persistentTimestampedWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-value store */ public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name) /** * Create a persistent transactional {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link KeyValueStore} you should use * {@link #persistentKeyValueStore(String)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-(timestamp/value) store */ public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name) /** * Create a persistent transactional {@link WindowBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link TimestampedWindowStore} you should use * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead. * * @param name instead. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize namesize of the storewindows (cannot be {@code null}negative) * @param retentionPeriodretainDuplicates whether lengthor ofnot time to retain dataduplicates. inTurning thethis storeon (cannotwill beautomatically negative)disable * (notecaching and means that thenull retentionvalues period mustwill be atignored. least long* enough@param totransactional contain the * whether the store should be transactional * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowedwindowSize} datacan'st entirebe liferepresented cycle,as from window-start through window-end, {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentWindowStore(final String name, and for the entire grace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable * final Duration retentionPeriod, caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller thanfinal {@codeDuration windowSize}, */ public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name, final boolean retainDuplicates, final Duration retentionPeriod, final boolean transactional) throws IllegalArgumentException /** * Create a persistent {@link WindowBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link TimestampedWindowStore} you should use * final{@link #persistentTimestampedWindowStore(String, Duration, windowSizeDuration, boolean)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * final boolean retainDuplicates) throws IllegalArgumentException /** * Create a persistent transactional {@link WindowBytesStoreSupplier}. * <p> (note *that Thisthe storeretention supplierperiod canmust be passedat intoleast a long * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.enough to contain the * If you want to create a {@link WindowStore} you should use * {@link #persistentWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead. * * @paramwindowed namedata's entire life cycle, from window-start through window-end, * name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data inand for the storeentire (cannot be negativegrace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether (noteor thatnot theto retentionretain period must be at least long enough to contain theduplicates. Turning this on will automatically disable * windowedcaching data'sand entiremeans lifethat cycle,null fromvalues window-startwill through window-end,be ignored. * @param transactional whether the store should be transactional * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException and for the entire grace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name, whether or not to retain duplicates. Turning this on will automatically disable * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can'tfinal beDuration representedretentionPeriod, as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name, final Duration windowSize, final Duration retentionPeriod, final boolean retainDuplicates, final Duration windowSize, final boolean transactional) throws IllegalArgumentException /** * Create a persistent {@link SessionBytesStoreSupplier}. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * final boolean retainDuplicates) throws IllegalArgumentException /** * Create a persistent transactional(note {@link SessionBytesStoreSupplier}. * * @param name that the retention period must be at least as long enough to * name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data incontain the storeinactivity (cannotgap beof negative) the *session and the entire grace period.) * @param transactional whether the store should be transactional * @return an instance of a {@link (note that the retention period must be at least as long enough to *SessionBytesStoreSupplier} */ public static SessionBytesStoreSupplier persistentSessionStore(final String name, contain the inactivity gap of the session and the entire grace period.) * @return an instance of a {@link SessionBytesStoreSupplier} */ public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name, final Duration retentionPeriod, final Duration retentionPeriod) |
Materialized
Code Block |
---|
public StoreSupplier<S> storeSupplier(); public final boolean transactional();) |
Materialized.StoreType
Materialized.StoreType
enum has a new value TXN_ROCKS_DB
that corresponds to a transactional state store implementation based on RocksDB.
Compatibility, Deprecation, and Migration Plan
...
Proposed changes are source compatible and binary incompatible with previous releases.
Test Plan
- Add a variation for all existing stateful tests to run with enabled transactional state stores.
- Add tests to ensure that transactional state stores discard uncommitted changes after Changes not committed to the changelog topic are discarded on crash failure.Changes committed to the changelog topic, but not committed to the state store are rolled forward
- Add tests to ensure that transactional state stores replay missing changes from the changelog topic on recovery.
Transactions via Secondary State Store for Uncommitted Changes
This proposal comes with a reference implementation used in the Stores#
factory methods used to create transactional state stores. In this implementation, transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, On commit, such state store copies uncommitted writes from the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It the main store, then truncates the temporary store and deletes the commit file once it is finished. stores.
All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator
that merges the results. On crash failure, ProcessorStateManager
calls StateStore#recover(offset)
. The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it that truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.
The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer.
...
- It doubles the number of open state stores
- It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.
- It requires an additional value copy per write to model deletions.
Rejected Alternatives
RocksDB in-memory Indexed Batches
...