Status

Current state: Accepted

Discussion thread: Thread

JIRA: KAFKA-14412 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Dependencies

This KIP depends on KIP-1035: StateStore managed changelog offsets, which used to be part of this KIP, but was broken out to provide more focus and make it easier to contribute and review. Consequently, the changes outlined in this KIP assume that KIP-1035 has already been adopted.

Motivation

As described in KIP-844, under EOS, crash failures cause all Task state to be wiped out on restart. This is because, currently, data is written to the StateStore before the commit to its changelog has completed, so it's possible that records are written to disk that were not committed to the store changelog.

This ensures consistency of local stores with their changelog topics, but can cause long delays in processing while it rebuilds the local state from the changelog. These delays are proprotional to the number of records in the changelog topic, which for highly active tables, or those with a very high cardinality, can be very large. Real-world use-cases have been observed where these delays can span multiple days, where both processing, and interactive queries, are paused.

In KIP-844, it was proposed to create an alternative type of StateStore, which would enable users to opt-in to "transactional" behaviour, that ensured data was only persisted once the changelog commit has succeeded. However, the design and approach outlined in KIP-844 unfortunately did not perform well when tested (with a write throughput that was approximately only 4% of the regular RocksDB StateStore!).

This KIP explores an alternative design that should have little/no performance impact, potentially performing better than the status quo, and can thus be enabled for all stores. This should bound state restore under EOS to less than 1 second, irrespective of the size of the changelogs.

Public Interfaces

New configuration

NameDefaultDescription
enable.transactional.statestoresfalse

Whether to enable transactional state stores. When enabled, state stores will buffer writes in a transaction buffer (if supported by the state store implementation), before committing them when the corresponding Kafka changelog transaction has committed.

Under EOS, state stores will no longer be wiped on-error and rebuilt from scratch. In the event of an error (under either EOS or ALOS), only the writes since the last successful commit will be lost and replayed through the topology.

statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes each instance will use to buffer uncommitted state-store records. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

default.interactive.query.isolation.levelREAD_UNCOMMITTED

The default transaction isolation level to use for Interactive Queries to State Stores.

Has no effect unless enable.transactional.statestores is true.

When set to READ_UNCOMMITTED  (the default, for backwards compatibility), writes to stores will become immediately visible to Interactive Query threads as soon as the put/delete method in the stream processor has returned. It's possible for Interactive Query threads to observe a write being "undone", if the application crashes after the write was observed, but before it was committed. If you want to guarantee consistency, use READ_COMMITTED.

When set to READ_COMMITTED, writes to state stores will only become visible to Interactive Query threads once they have been successfully committed. Writes may not be visible to Interactive Query threads for up to commit.interval.ms after the put/delete call has returned in the stream processor.

New Metrics

NameLevelTypeScopeDescription
uncommitted-bytesINFOGaugestoreApproximate number of bytes currently buffered by the store's transaction buffer. If enable.transactional.statestores is false, this will always be 0.

Changed Interfaces

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.state.ReadOnlyKeyValueStore
  • org.apache.kafka.streams.state.ReadOnlySessionStore
  • org.apache.kafka.streams.state.ReadOnlyWindowStore
  • org.apache.kafka.streams.state.VersionedKeyValueStore
  • org.apache.kafka.streams.state.VersionedBytesStore
  • org.apache.kafka.streams.query.StateQueryRequest
  • org.apache.kafka.streams.query.QueryConfig

Changes:

org.apache.kafka.streams.processor.StateStore
   /**      
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory that would be freed by the next call to {@link
     * #commit(Map)}.
     * <p>
     * If no records have been written to this store since {@link #init(StateStoreContext, StateStore) opening}, or
     * since the last {@link #commit(Map)}; or if this store does not support atomic transactions, it will return {@code
     * 0}, as no records are currently being buffered.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}; or {@code 0} if this store does not
     *         support transactions, or has not been written to since {@link #init(StateStoreContext, StateStore)} or
     *         last {@link #commit(Map)}.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }
ReadOnlyKeyValueStore, ReadOnlySessionStore, ReadOnlyWindowStore, VersionedKeyValueStore, VersionedBytesStore
    /**
     * Return a read-only view of this store restricted by the given {@link IsolationLevel}.
     * <p>
     * If transactional stores are disabled (i.e. when {@code enable.transactional.statestores} is
     * {@code false}, or if this store does not support transactionality, the {@link IsolationLevel}
     * will be ignored, and the returned view will implicitly have {@code READ_UNCOMMITTED}
     * visibility.
     * <p>
     * If {@link IsolationLevel} is {@code READ_COMMITTED}, only records written before the last
     * task commit will be visible in the resulting view.
     * <p>
     * Otherwise, if {@link IsolationLevel} is {@code READ_UNCOMMITTED}, or transactionality is
     * not supported, all records written to this store will be immediately visible in the view.
     *
     * @param isolationLevel the isolation level the returned view should use for reads.
     * @return a read-only view of this store using the given isolation level.
     */
    default ReadOnlyKeyValueStore<K, V> readOnly(final IsolationLevel isolationLevel) {
        return this;
    }
org.apache.kafka.streams.query.StateQueryRequest
    /**
     * Overrides the {@link IsolationLevel} for this query. When absent, the effective isolation
     * level falls back to {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG}.
     */
    public StateQueryRequest<R> withIsolationLevel(final IsolationLevel isolationLevel) {
        return new StateQueryRequest<>(
            storeName,
            position,
            partitions,
            query,
            executionInfoEnabled,
            requireActive,
            Optional.of(isolationLevel)
        );
    }

    /**
     * The isolation level override for this query, if any. If empty, the effective isolation level
     * comes from {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_INTERACTIVE_QUERY_ISOLATION_LEVEL_CONFIG}.
     */
    public Optional<IsolationLevel> isolationLevel() {
        return isolationLevel;
    }
org.apache.kafka.streams.query.QueryConfig
    public IsolationLevel getIsolationLevel() {
        return isolationLevel;
    }

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit.

We enable this transaction isolation with the new enable.transactional.statestores configuration.

enable.transactional.statestoresDescription
false

When transactional state stores are disabled, Streams behaves as it currently does, wiping state stores on-error when the processing.guarantee is one of exactly-once-v2.

This provides no atomicity, consistency, isolation or durability guarantees.

true

When transactional state stores are enabled, Streams will isolate writes from state stores until commit. This guarantees consistency of the on-disk data with the store changelog, so Streams will not need to wipe stores on-error.

The default value for enable.transactional.statestores will be false, to mirror the behaviour we have today, and ensure no unexpected change in behaviour for users of Interactive Queries.

Additionally, Interactive Queries can control the visibility of records, by explicitly providing an IsolationLevel  (IQv2) or configuring the default.interactive.query.isolation.level  (IQv1 and IQv2):

default.interactive.query.isolation.levelDescription
READ_UNCOMMITTED

Writes are visible to Interactive Queries immediately as they are written to the transaction buffer. This provides for low latency visibility of newly written records, at the cost of consistency.

WARNING: In order to provide snapshot isolation for range scans, Interactive Queries temporarily block processing threads from committing when beginning a range scan (i.e. all , range , prefixScan , etc.). This is dependent on the number of buffered records that fall within the requested range scan, so it's best to keep these small. all() for very active stores could heavily impact throughput. If this is a problem, consider using READ_COMMITTED isolation instead, which is lock-free.

READ_COMMITTED

Writes are visible to Interactive Queries once they have been committed to the underlying store. This provides the most robust consistency guarantees, at the cost of some latency: in the worst-case, writes may take up to commit.interval.ms to become visible to Interactive Queries.

In-memory Transaction Buffers

Many StateStore implementations, including RocksDB, will buffer records written to a transaction entirely in-memory, which could cause issues, either with JVM heap or native memory. To mitigate this, we will automatically force a Task commit if the total memory used by an instance for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.

Since stream threads operate independently of each other, this behaviour will be enforced by dividing the limit defined by statestore.uncommitted.max.bytes  between each StreamThread, and each thread will independently force an early commit if their uncommitted memory usage exceeds (or will soon exceed)  statestore.uncommitted.max.bytes/num.stream.threads.

It's possible that some Topologies can generate many more new StateStore entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross the threshold.

Note that this new method provides default implementations that ensure existing custom stores and future non-transactional stores do not force any early commits.

Transaction Buffer Thread Safety

To enable Interactive Queries to use READ_UNCOMMITTED isolation, we need to ensure that our TransactionBuffers are thread-safe for reads. To achieve this, we split our TransactionBuffers into two parts:

  1. A ConcurrentSkipListMap "read buffer", that services all reads.
  2. An implementation dependent "write buffer", that is used for writes to the underlying data store. For RocksDBStore, this is a WriteBatch, which is atomically committed. InMemoryKeyValueStore does not have a write buffer, and just uses its read buffer instead.

Reads from the StreamThread and READ_UNCOMMITTED IQ threads are serviced by the read buffer. The write buffer is (optionally) used to buffer writes for commit to the database, if doing so would yield better performance than using the read buffer. More concretely:

  • RocksDBStore will use a RocksDB WriteBatch as its write buffer, incrementally building it on each put and delete . This ensures commits are fast, minimizing the time that IQ reads are blocked, as we do not have to iterate the read buffer and construct a WriteBatch at commit-time.
  • InMemory*Stores will not have a dedicated write buffer, as there is no structure more optimal than the ConcurrentSkipListMap read buffer. Consequently, InMemory stores will use less memory to buffer writes than RocksDB stores.

On-commit, the read buffer is emptied. This yields a problem: a READ_UNCOMMITTED Interactive Query thread may still have an open Iterator to read a range from the read buffer. The solution is for READ_UNCOMMITTED Interactive Query threads to take a "snapshot" of the range they are scanning when they construct the iterator. During this snapshot process, we need to prevent the read buffer being cleared by commit, via a ReentrantReadWriteLock. It is therefore possible for READ_UNCOMMITTED  Interactive Queries to slow the StreamThread throughput, by frequently requesting large range scans. This can be minimized by minimizing the size of the TransactionBuffer , either via. commit.interval.ms  or via statestore.uncommitted.max.bytes.

For this reason, despite the default.interactive.query.isolation.level  being READ_UNCOMMITTED for backwards compatibility, we should encourage users who enable transactions to also set their default isolation level to READ_COMMITTED.

This buffering strategy enables us to provide a transaction buffer for both RocksDBStore  and InMemory(KeyValue|Session|Window)Store , as well as support other store implementations in the future. The interface and base implementation are available to use by custom store implementations, but are not (yet) part of the public API.

Interactive Queries

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, we make provide a configuration for Interactive Queries to determine the IsolationLevel of their reads: default.interactive.query.isolation.level.

When the isolation level is READ_COMMITTED , the maximum time for records to become visible to interactive queries will be commit.interval.ms. Under EOS, this is by default a low value (100 ms), but under at-least-once, the default is 30 seconds. Users may need to adjust their commit.interval.ms to meet the visibility latency goals for their use-case.

When the isolation level is READ_UNCOMMITTED, which is the default, all written records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency. However, this may yield inconsistencies if transactions are rolled back or the application crashes. Additionally, if an Interactive Query begins a range scan (all(), range(), prefixScan(), etc.), we must take a snapshot of that range from the transaction buffer, to prevent the returned Iterator becoming invalid mid-iteration when the transaction buffer is cleared after a commit. This snapshot process temporarily blocks the StreamThread from committing writing to or committing the store, so requesting large ranges with READ_UNCOMMITTED  isolation can negatively impact throughput. If this is required, it's recommended to reduce commit.interval.ms or statestore.uncommittd.max.bytes  to minimize the size of the transaction buffer that needs to be copied during the snapshot process.

State Store Caching

Kafka Streams today provides a write-through cache for StateStores, controlled by cache.max.bytes.buffering. This implements an LRU cache, improving throughput when the same keys are repeatedly read by processors. Since transactional stores introduce a new kind of record buffering, we must ensure that transaction buffering and record caching behave consistently, and minimize redundancy as much as possible.

There are two key differences between transaction buffering and record caching:

  • Transaction buffering only buffers writes; record caching caches both reads and writes.
  • Transaction buffering evicts all entries on-commit; record caching evicts entries according to an LRU policy. Consequently, this means that cached records persist in the record cache across commits.

We therefore cannot replace record caching with transaction buffers, and cannot implement transaction buffers with the record cache, as they solve different problems.

It might seem like record caching and transaction buffering would cause records to occupy twice as much memory when present in both: however, since the transaction buffer and record cache both cache the same on-heap objects, the actual overhead of records being present in both is mostly just some extra object pointers and internal Map.Node machinery. The keys and values themselves will be shared in the JVM heap. However, this does introduce a subtle issue: records in both the transaction buffer and record cache will be counted against both cache.max.bytes.buffering and statestore.uncommitted.max.bytes, despite only being present in the JVM heap once. This might cause less optimal use of either (or both) of the memory allocated to both record caching and transaction buffering.

A more complete solution might be to combine the transaction buffering and record cache into a single structure, that can correctly "pin" uncommitted writes, and otherwise evict on an LRU policy. However, doing so would need to combine/replace the cache and buffer size configurations. We have decided not to implement this in this KIP, and instead revisit this issue in the future, if it is warranted.

However, to ensure consistency we must ensure that IQ READ_COMMITTED reads ignore the record cache, because it's possible for a record to be written, then read back (and consequently cached) before the write has been committed.

Error Handling

Kafka Streams currently generates a TaskCorruptedException when a Task needs to have its state wiped (under EOS) and be re-initialized. There are currently several different situations that generate this exception:

  1. No offsets for the store can be found when opening it under EOS.
  2. OutOfRangeException during restoration, usually caused by the changelog being wiped on application reset.
  3. TimeoutException under EOS, when writing to or committing a Kafka transaction.

The first two of these are extremely rare, and make sense to keep. However, timeouts are much more frequent. They currently require the store to be wiped under EOS because when a timeout occurs, the data in the local StateStore will have been written, but the data in the Kafka changelog will have failed to be written, causing a mismatch in consistency.

With Transactional StateStores,  we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException when transactional state stores are enabled.

Interactive Query .position Offsets

Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are stored in the offsets column family, since KIP-1035 by the RocksDBStore implementation. To ensure these offsets remain consistent with the data they correspond to, they will also need to be buffered in the same transaction buffer (aka. WriteBatch) as the data they came from.

When writing data to a RocksDBStore (via put, delete, etc.), the input partition offsets will be read from the changelog record metadata (as before), and these offsets will be added to the current transactions WriteBatch. When the StateStore is committed, the position offsets in the current WriteBatch will be written to RocksDB, alongside the records they correspond to. Alongside this, RocksDBStore will maintain two Position maps in-memory, one containing the offsets pending in the current transaction's WriteBatch, and the other containing committed offsets. On commit(Map), the uncommitted Position map will be merged into the committed Position map. In this sense, the two Position maps will diverge during writes, and re-converge on-commit.

When an interactive query is made with READ_COMMITTED isolation, the PositionBound will constrain the committed Position map, whereas under READ_UNCOMMITTED isolation, the PositionBound will constrain the uncommitted Position map.

RocksDB Transactions

When transactional state stores are enabled, we will use RocksDB's WriteBatch, which exists specifically for use-cases that wish to implement transactions, as we seek to in Kafka Streams. Combined with a ConcurrentSkipListMap to make uncommitted records available for reads by any thread, we expect the  performance to actually be better than the existing, non-batched write path. The main performance concern is that the WriteBatch+ConcurrentSkipListMap buffer must reside completely in-memory until it is committed, which is addressed by statestore.uncommitted.max.bytes, see above.

Compatibility, Deprecation, and Migration Plan

The above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although its internals will change, potentially requiring users that depend on internal behaviour to update their code.

All new methods on existing classes will have defaults set to ensure compatibility.

Users may notice a change in the performance/behaviour of Kafka Streams. Most notably, under EOS Kafka Streams will now regularly "commit" StateStores, where it would have only done so when the store was closing in the past. The overall performance of this should be at least as good as before, but the profile will be different, with write latency being substantially faster, and commit latency being a bit higher.

Upgrading

When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action. By default, transactional state stores will be disabled, resulting in no change in behaviour for users.

Users who wish to benefit from reduced restore times on-error will need to explicitly enable transactional state stores.

Downgrading

When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. No changes to on-disk formats are made by this KIP, so downgrading to a previous version will not trigger state restore.

However, if the user has overridden the defaults for either of the new configuration properties (see above), warnings will be logged on startup about unknown configuration properties.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

Rejected Alternatives

Dual-Store Approach (KIP-844)

The design outlined in KIP-844, sadly, does not perform well (as described above), and requires users to opt-in to transactionality, instead of being a guarantee provided out-of-the-box.

Replacing RocksDB memtables with ThreadCache

It was pointed out on the mailing list that Kafka Streams fronts all RocksDB StateStores with a configurable record cache, and that this cache duplicates the function requests for recently written records provided by RocksDB memtables. A suggestion was made to utilize this record cache (the ThreadCache class) as a replacement for memtables, by directly flushing them to SSTables using the RocksDB SstFileWriter.

This is out of scope of this KIP, as its goal would be reducing the duplication (and hence, memory usage) of RocksDB StateStores; whereas this KIP is tasked with improving the consistency of StateStores to reduce the frequency and impact of state restoration, improving their scalability.

It has been recommended to instead pursue this idea in a subsequent KIP, as the interface changes outlined in this KIP should be compatible with this idea.

Interactive Query v1 configurable IsolationLevel

While it is technically possible (and trivial) to make IsolationLevel configurable per-query for the Interactive Query v1 API, we decided against this, to provide increased incentive for users to migrate to the v2 API. The IsolationLevel of v1 queries can still be globally controlled via the default.interactive.query.isolation.level configuration.



  • No labels