Status
Current state: Adopted
Discussion thread: Thread
JIRA:
-
KAFKA-14412Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Dependencies
This KIP depends on KIP-1035: StateStore managed changelog offsets, which used to be part of this KIP, but was broken out to provide more focus and make it easier to contribute and review. Consequently, the changes outlined in this KIP assume that KIP-1035 has already been adopted.
Motivation
As described in KIP-844, under EOS, crash failures cause all Task state to be wiped out on restart. This is because, currently, data is written to the StateStore before the commit to its changelog has completed, so it's possible that records are written to disk that were not committed to the store changelog.
This ensures consistency of local stores with their changelog topics, but can cause long delays in processing while it rebuilds the local state from the changelog. These delays are proprotional to the number of records in the changelog topic, which for highly active tables, or those with a very high cardinality, can be very large. Real-world use-cases have been observed where these delays can span multiple days, where both processing, and interactive queries, are paused.
In KIP-844, it was proposed to create an alternative type of StateStore, which would enable users to opt-in to "transactional" behaviour, that ensured data was only persisted once the changelog commit has succeeded. However, the design and approach outlined in KIP-844 unfortunately did not perform well when tested (with a write throughput that was approximately only 4% of the regular RocksDB StateStore!).
This KIP explores an alternative design that should have little/no performance impact, potentially performing better than the status quo, and can thus be enabled for all stores. This should bound state restore under EOS to less than 1 second, irrespective of the size of the changelogs.
Public Interfaces
New configuration
Name | Default | Description |
---|---|---|
enable.transactional.statestores | false | Whether to enable transactional state stores. When enabled, state stores will buffer writes in a transaction buffer (if supported by the state store implementation), before committing them when the corresponding Kafka changelog transaction has committed. This has two observable effects:
|
statestore.uncommitted.max.bytes | 67108864 (64 MB) | Maximum number of memory bytes each instance will use to buffer uncommitted state-store records. If this limit is exceeded, a task commit will be requested. No limit: -1. Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors. |
Changed Interfaces
- org.apache.kafka.streams.processor.StateStore
Changes:
/** * Return an approximate count of memory used by records not yet committed to this StateStore. * <p> * This method will return an approximation of the memory that would be freed by the next call to {@link * #commit(Map)}. * <p> * If no records have been written to this store since {@link #init(StateStoreContext, StateStore) opening}, or * since the last {@link #commit(Map)}; or if this store does not support atomic transactions, it will return {@code * 0}, as no records are currently being buffered. * * @return The approximate size of all records awaiting {@link #commit(Map)}; or {@code 0} if this store does not * support transactions, or has not been written to since {@link #init(StateStoreContext, StateStore)} or * last {@link #commit(Map)}. */ @Evolving default long approximateNumUncommittedBytes() { return 0; }
Proposed Changes
To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit.
We enable this transaction isolation with the new enable.transactional.statestores
configuration.
enable.transactional.statestores | Description |
---|---|
false | Records written by the StreamThread are visible to all Interactive Query threads immediately. This level provides no atomicity, consistency, isolation or durability guarantees. When transactional state stores are disabled, Streams behaves as it currently does, wiping state stores on-error when the |
true | Records written by the StreamThread are only visible to Interactive Query threads once they have been committed. When transactional state stores are enabled, Streams will isolate writes from state stores until commit. This guarantees consistency of the on-disk data with the store changelog, so Streams will not need to wipe stores on-error. |
In Kafka Streams, all StateStore
s are written to by a single StreamThread
(this is the Single Writer principle). However, multiple other threads may concurrently read from StateStore
s, principally to service Interactive Queries. In practice, this means that when transactional state stores are enabled, writes by the StreamThread
that owns the StateStore
will only become visible to Interactive Query threads once commit()
has been called.
The default value for enable.transactional.statestores
will be false
, to mirror the behaviour we have today, and ensure no unexpected change in behaviour for users of Interactive Queries.
In-memory Transaction Buffers
Many StateStore implementations, including RocksDB, will buffer records written to a transaction entirely in-memory, which could cause issues, either with JVM heap or native memory. To mitigate this, we will automatically force a Task
commit if the total memory used by an instance for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes()
exceeds the threshold configured by statestore.uncommitted.max.bytes
. This will roughly bound the memory required for buffering uncommitted records, irrespective of the commit.interval.ms
, and will effectively bound the number of records that will need to be restored in the event of a failure.
Since stream threads operate independently of each other, this behaviour will be enforced by dividing the limit defined by statestore.uncommitted.max.bytes
between each StreamThread
, and each thread will independently force an early commit if their uncommitted memory usage exceeds (or will soon exceed) statestore.uncommitted.max.bytes/num.stream.threads
.
It's possible that some Topologies can generate many more new StateStore
entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread
will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross the threshold.
Note that this new method provides default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.
Interactive Queries
Interactive queries currently see every record, as soon as they are written to a StateStore
. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, we make transactional state stores globally configurable via enable.transactional.statestores
(see above).
When transactional state stores are enabled, the maximum time for records to become visible to interactive queries will be commit.interval.ms
. Under EOS, this is by default a low value (100 ms
), but under at-least-once
, the default is 30 seconds. Users may need to adjust their commit.interval.ms
to meet the visibility latency goals for their use-case.
When transactional state stores are disabled, which is the defaultt, all written records will be immediately visible to interactive queries, so the high default commit.interval.ms
of 30s
will have no impact on interactive query latency.
Error Handling
Kafka Streams currently generates a TaskCorruptedException when a Task
needs to have its state wiped (under EOS) and be re-initialized. There are currently several different situations that generate this exception:
- No offsets for the store can be found when opening it under EOS.
OutOfRangeException
during restoration, usually caused by the changelog being wiped on application reset.TimeoutException
under EOS, when writing to or committing a Kafka transaction.
The first two of these are extremely rare, and make sense to keep. However, timeouts are much more frequent. They currently require the store to be wiped under EOS because when a timeout occurs, the data in the local StateStore
will have been written, but the data in the Kafka changelog will have failed to be written, causing a mismatch in consistency.
With Transactional StateStores, we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException
when transactional state stores are enabled.
Interactive Query .position Offsets
Input partition "Position
" offsets, introduced by KIP-796: Interactive Query v2, are stored in the offsets column family, since KIP-1035 by the RocksDBStore
implementation. To ensure these offsets remain consistent with the data they correspond to, they will also need to be buffered in the same transaction buffer (aka. WriteBatch
) as the data they came from.
When writing data to a RocksDBStore
(via put
, delete
, etc.), the input partition offsets will be read from the changelog record metadata (as before), and these offsets will be added to the current transactions WriteBatch
. When the StateStore
is committed, the position offsets in the current WriteBatch
will be written to RocksDB, alongside the records they correspond to. Alongside this, RocksDBStore
will maintain two Position
maps in-memory, one containing the offsets pending in the current transaction's WriteBatch
, and the other containing committed offsets. On commit(Map)
, the uncommitted Position
map will be merged into the committed Position
map. In this sense, the two Position
maps will diverge during writes, and re-converge on-commit.
When an interactive query is made with transactional state stores enabled, the PositionBound
will constrain the committed Position map, whereas when transactional state stores are disabled, the PositionBound
will constrain the uncommitted Position map.
RocksDB Transactions
When transactional state stores are enabled, we will use RocksDB's WriteBatchWithIndex
, which exists specifically for use-cases that wish to implement transactions, as we seek to in Kafka Streams. When reading records from the StreamThread
, we will use the WriteBatchWithIndex#getFromBatchAndDB
and WriteBatchWithIndex#newIteratorWithBase
utilities in order to ensure that uncommitted writes are available to query. When reading records from Interactive Queries, we will use the regular RocksDB#get
and RocksDB#newIterator
methods, to ensure we see only records that have been committed (see above). The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the WriteBatch must reside completely in-memory until it is committed, which is addressed by statestore.uncommitted.max.bytes
, see above.
Compatibility, Deprecation, and Migration Plan
The above changes will retain compatibility for all existing StateStores
, including user-defined custom implementations. Any StateStore
that extends RocksDBStore
will automatically inherit its behaviour, although its internals will change, potentially requiring users that depend on internal behaviour to update their code.
All new methods on existing classes will have defaults set to ensure compatibility.
Users may notice a change in the performance/behaviour of Kafka Streams. Most notably, under EOS Kafka Streams will now regularly "commit" StateStores, where it would have only done so when the store was closing in the past. The overall performance of this should be at least as good as before, but the profile will be different, with write latency being substantially faster, and commit latency being a bit higher.
Upgrading
When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action. By default, transactional state stores will be disabled, resulting in no change in behaviour for users.
Users who wish to benefit from reduced restore times on-error will need to explicitly enable transactional state stores, provided they are not dependent on uncommitted writes being immediately visible to Interactive Queries.
Downgrading
When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. No changes to on-disk formats are made by this KIP, so downgrading to a previous version will not trigger state restore.
However, if the user has overridden the defaults for either of the new configuration properties (see above), warnings will be logged on startup about unknown configuration properties.
Test Plan
Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.
Rejected Alternatives
Dual-Store Approach (KIP-844)
The design outlined in KIP-844, sadly, does not perform well (as described above), and requires users to opt-in to transactionality, instead of being a guarantee provided out-of-the-box.
Replacing RocksDB memtables with ThreadCache
It was pointed out on the mailing list that Kafka Streams fronts all RocksDB StateStores with a configurable record cache, and that this cache duplicates the function requests for recently written records provided by RocksDB memtables. A suggestion was made to utilize this record cache (the ThreadCache
class) as a replacement for memtables, by directly flushing them to SSTables using the RocksDB SstFileWriter
.
This is out of scope of this KIP, as its goal would be reducing the duplication (and hence, memory usage) of RocksDB StateStores; whereas this KIP is tasked with improving the consistency of StateStores to reduce the frequency and impact of state restoration, improving their scalability.
It has been recommended to instead pursue this idea in a subsequent KIP, as the interface changes outlined in this KIP should be compatible with this idea.
Transactional support under READ_UNCOMMITTED
When query isolation level is READ_UNCOMMITTED, Interactive Query threads need to read records from the ongoing transaction buffer. Unfortunately, the RocksDB WriteBatch is not thread-safe, causing Iterators created by Interactive Query threads to produce invalid results/throw unexpected errors as the WriteBatch is modified/closed during iteration.
Ideally, we would build an implementation of a transaction buffer that is thread-safe, enabling Interactive Query threads to query it safely. One approach would be to "chain together" WriteBatches, creating a new WriteBatch every time a new Iterator is created by an Interactive Query thread and "freezing" the previous WriteBatch.
It was decided to defer tackling this problem to a later KIP, in order to realise the benefits of transactional state stores to users as quickly as possible.
Query-time Isolation Levels
It was requested that users be able to select the isolation level of queries on a per-query basis. This would require some additional API changes (to the Interactive Query APIs). Such an API would require that state stores are always transactional, and that the transaction buffers can be read from by READ_UNCOMMITTED queries. Due to the problems outlined in the previous section, it was decided to also defer this to a subsequent KIP.
The new configuration option default.state.isolation.level
was deliberately named to enable query-time isolation levels in the future, whereby any query that didn't explicitly choose an isolation level would use the configured default. Until then, this configuration option will globally control the isolation level of all queries, with no way to override it per-query.