...
There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single reference implementation via a secondary RocksDB for uncommitted writes. In addition, it provides a RocksDBTransactionalMechanism enum to control the specific implementation used in Stores#
and Materialized
APIs.
StateStore changes
This section covers multiple changes to the state store interfaces. This proposal replaces StateStore#flush
with 2 new methods - StateStore#commit(Long)
and StateStore#recover(long)
and adds a boolean transactional()
method to determine if a state store is transactional.
...
Code Block | ||
---|---|---|
| ||
/** * Return true if a call to {@link StoreSupplier#get} returns a transactional state * store. * * @return {@code true} if a call to {@link StoreSupplier#get} returns a transactional state * store, {@code false} otherwise. */ boolean transactional(); |
RocksDBTransactionalMechanism
Code Block | ||
---|---|---|
| ||
public enum RocksDBTransactionalMechanism {
// RocksDB will use a secondary store for the uncommitted writes
SECONDARY_STORE,
} |
Stores
Code Block | ||
---|---|---|
| ||
/** * Create a persistent transactional {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a transactional {@link TimestampedKeyValueStore} you should use * {@link #persistentTransactionalTimestampedKeyValueStore(String, RocksDBTransactionalMechanism)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param txnMechanism the transactional mechanism to use for the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent transactional key-value store */ public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name) /** * Create a persistent transactional {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link KeyValueStore} you should use * {@link #persistentKeyValueStore(String)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-(timestamp/value) store */ public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name) /** , final RocksDBTransactionalMechanism txnMechanism) /** * Create a persistent transactional {@link WindowBytesStoreSupplierKeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #windowStoreBuilder#keyValueStoreBuilder(WindowBytesStoreSupplierKeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a transactional {@link TimestampedWindowStoreTimestampedKeyValueStore} you should use * {@link #persistentTimestampedWindowStore#persistentTransactionalTimestampedKeyValueStore(String, Duration, Duration, boolean, booleanRocksDBTransactionalMechanism)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param txnMechanism the transactional mechanism to nameuse offor the store (cannot be {@code null}) * @param@return retentionPeriodan instance of a {@link KeyValueBytesStoreSupplier} that lengthcan ofbe timeused to retain* datato inbuild thea storepersistent (cannot be negative)transactional key-value store */ public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name, (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, *final RocksDBTransactionalMechanism txnMechanism) /** * Create a persistent transactional {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. and for* theIf entireyou gracewant period) to *create @parama windowSize{@link KeyValueStore} you should use * {@link #persistentTransactionalKeyValueStore(String)} to create a sizestore ofsupplier theinstead. windows (cannot* be negative) * @param retainDuplicatesname name of the store whether(cannot orbe not to retain duplicates. Turning this on will automatically disable * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name, {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent transactional key-(timestamp/value) store */ public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name) /** * Create a persistent transactional {@link KeyValueBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link KeyValueStore} you should use * {@link #persistentTransactionalKeyValueStore(String, RocksDBTransactionalMechanism)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param txnMechanism the transactional mechanism to use for the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent key-(timestamp/value) store */ public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name, final Duration retentionPeriod, final RocksDBTransactionalMechanism txnMechanism) final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException /** * Create a persistent transactional {@link WindowBytesStoreSupplier}. * <p> * This store supplier can be passed into a * {@link #timestampedWindowStoreBuilder#windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. * If you want to create a {@link WindowStoreTimestampedWindowStore} you should use * {@link #persistentWindowStore#persistentTransactionalTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStorepersistentTransactionalWindowStore(final String name, final Duration retentionPeriod, final Duration retentionPeriod, final Duration windowSize, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException final boolean retainDuplicates) throws IllegalArgumentException /** * Create a persistent transactional {@link SessionBytesStoreSupplier}. * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) * /** * Create a persistent transactional {@link WindowBytesStoreSupplier}. * <p> * This store supplier can be passed into a {@link #windowStoreBuilder(noteWindowBytesStoreSupplier, that the retention period must be at least as long enough to * Serde, Serde)}. * If you want to create a {@link TimestampedWindowStore} you should use * {@link #persistentTransactionalTimestampedWindowStore(String, Duration, Duration, boolean, RocksDBTransactionalMechanism)} to create a store supplier instead. * * @param name contain the inactivity gapname of the sessionstore and(cannot thebe entire grace period.{@code null}) * @return@param anretentionPeriod instance of a {@link SessionBytesStoreSupplier} */ public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name, length of time to retain data in the store (cannot be negative) * (note that the retention period must be at least long enough to contain the * finalwindowed Duration retentionPeriod) |
Materialized
data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @param txnMechanism the transactional mechanism to use for the store (cannot be {@code null})
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final RocksDBTransactionalMechanism txnMechanism) throws IllegalArgumentException
/**
* Create a persistent transactional {@link WindowBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a
* {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link WindowStore} you should use
* {@link #persistentTransactionalWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException
/**
* Create a persistent transactional {@link WindowBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a
* {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link WindowStore} you should use
* {@link #persistentTransactionalWindowStore(String, Duration, Duration, boolean, RocksDBTransactionalMechanism)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @param txnMechanism the transactional mechanism to use for the store (cannot be {@code null})
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
*/
public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final RocksDBTransactionalMechanism txnMechanism) throws IllegalArgumentException
/**
* Create a persistent transactional {@link SessionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
final Duration retentionPeriod)
/**
* Create a persistent transactional {@link SessionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @param txnMechanism the transactional mechanism to use for the store (cannot be {@code null})
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
final Duration retentionPeriod,
final RocksDBTransactionalMechanism txnMechanism) |
Materialized
Code Block |
---|
public StoreSupplier<S> storeSupplier();
public boolean transactional();
/**
* Enable transactionality of the materialized {@link StateStore}.
*
* @return itself
* @throws IllegalArgumentException if store supplier is also pre-configured
*/
public Materialized<K, V, S> withTransactionsEnabled()
/**
* Disable transactionality of the materialized {@link StateStore}.
*
* @return itself
* @throws IllegalArgumentException if store supplier is also pre-configured
*/
public Materialized<K, V, S> withTransactionsDisabled() |
Code Block |
public StoreSupplier<S> storeSupplier();
public boolean transactional(); |
Compatibility, Deprecation, and Migration Plan
...
- It doubles the number of open state stores
- It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.
Rejected Alternatives
RocksDB in-memory Indexed Batches
...