...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Return true the storage supports transactions. * * @return {@code true} if the storage supports transactions, {@code false} otherwise */ default boolean transactional() { return false; } /** * Flush and commit any cached data * <p> * For transactional state store commit applies all changes atomically. In other words, either the * entire commit will be successful or none of the changes will be applied. * <p> * For non-transactional state store this method flushes cached data. * * @param changelogOffset the offset of the changelog topic this commit corresponds to. The * offset can be null if the state store does not have a changelog * (e.g. a global store). * @code null} */ default void commit(final Long changelogOffset); /** { * Recover aif (transactional state store()) { * <p> * Ifthrow a transactionalnew UnsupportedOperationException("Transactional state store shutmust down with a crash failure, this method can eitherimplement StateStore#commit"); } else { * roll back or forwardflush(); uncommitted changes. In any case, this method returns the changelog } } /** * Recover a transactional state store * offset it rolls to.<p> * If *a @paramtransactional changelogOffsetstate thestore checkpointedshut changelogdown offset. with a *crash @returnfailure, thethis changelogmethod offset after recovery.can either */ Long recover(final Long changelogOffset) roll back or forward uncommitted changes. In any case, this method returns the changelog * offset it rolls to. * * @param changelogOffset the checkpointed changelog offset. * @return the changelog offset after recovery or {@code null} if recovery failed. */ default Long recover(final Long changelogOffset) { if (transactional()) { throw new UnsupportedOperationException("Transactional state store must implement StateStore#recover"); } return changelogOffset; } |
StoreSupplier
Code Block | ||
---|---|---|
| ||
/** * Return true if a call to {@link StoreSupplier#get} returns a transactional state * store. * * @return {@code true} if a call to {@link StoreSupplier#get} returns a transactional state * store, {@code false} otherwise. */ boolean transactional(); |
...
Code Block | ||
---|---|---|
| ||
/** * Create a persistent transactional {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link TimestampedKeyValueStore} you should use * {@link #persistentTransactionalTimestampedKeyValueStore(String)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-value store */ public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name) /** * Create a persistent transactional {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link KeyValueStore} you should use * {@link #persistentKeyValueStore(String)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-(timestamp/value) store */ public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name) /** * Create a persistent transactional {@link WindowBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link TimestampedWindowStore} you should use * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException /** * Create a persistent transactional {@link WindowBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link WindowStore} you should use * {@link #persistentWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException /** * Create a persistent transactional {@link SessionBytesStoreSupplier}. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least as long enough to * contain the inactivity gap of the session and the entire grace period.) * @return an instance of a {@link SessionBytesStoreSupplier} */ public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name, final Duration retentionPeriod) |
Materialized
Code Block |
---|
public Materialized<K, V, S> withTransactionalityEnabled();
public Materialized<K, V, S> withTransactionalityDisabled();
public StoreSupplier<S> storeSupplier();
public boolean transactional(); |
...
Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true
to Materialized
constructor and Stores
factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract StateStore#transactional()
contract.
StateStore#flush()
method is deprecated. New StateStore#commit(offset) method will fall back to StateStoreNon-transactional state stores it will call the new
Proposed changes are source compatible and binary incompatible with previous releases.
...