Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Change default.state.isolation.level to enable.transactional.statestores

...

New configuration

NameDefaultDescription
enable.transactional.statestoresfalse

Whether to enable transactional state stores. When enabled, state stores will  buffer writes in a transaction buffer (if supported by the state store implementation), before committing them when the corresponding Kafka changelog transaction has committed. This has two observable effects:

  1. Under EOS, state stores will  no longer be wiped on-error and rebuilt from scratch. In the event of an error (under either EOS or ALOS), only the writes since the last successful commit will be lost and replayed through the topology.
  2. Writes will not become visible to Interactive Queries until after the next commit. Consequently, when transactional state stores are enabled, commit.interval.ms dictates the maximum time that Interactive Queries will need to wait to observe a write
default.state.isolation.levelREAD_UNCOMMITTEDThe default isolation level for Interactive Queries against StateStores. Supported values are READ_UNCOMMITTED and READ_COMMITTED
  1. .
statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes each instance will use to buffer uncommitted state-store records. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

...

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of transaction Isolation Levels, that dictate the visibility of records, written by processing threads, to Interactive Query threads.

We enable configuration of the level of isolation provided by StateStores via a default.state.isolation.level, which can be configured to either:this transaction isolation with the new enable.transactional.statestores configuration.

enable.transactional.statestoresDescription
false
default.state.isolation.levelDescription
READ_UNCOMMITTED

Records written by the StreamThread are visible to all Interactive Query threads immediately. This level provides no atomicity, consistency, isolation or durability guarantees.

Under this Isolation LevelWhen transactional state stores are disabled, Streams behaves as it currently does, wiping state stores on-error when the processing.mode is one of exactly-once, exactly-once-v2  or exactly-once-beta.

READ_COMMITTEDtrue

Records written by the StreamThread are only visible to Interactive Query threads once they have been committed.

Under this Isolation LevelWhen transactional state stores are enabled, Streams will isolate writes from state stores until commit. This guarantees consistency of the on-disk data with the store changelog, so Streams will not need to wipe stores on-error.

In Kafka Streams, all StateStore s are written to by a single StreamThread  (this is the Single Writer principle). However, multiple other threads may concurrently read from StateStore s s, principally to service Interactive Queries. In practice, this means that under READ_COMMITTEDwhen transactional state stores are enabled, writes by the StreamThread  that owns the StateStore  will only become visible to Interactive Query threads once commit()  has been called.once commit()  has been called.

The default value for enable.transactional.statestores will be false, to mirror the behaviour we have today, and ensure no unexpected change in behaviour for users of Interactive QueriesThe default value for default.state.isolation.level will be READ_UNCOMMITTED, to mirror the behaviour we have today; but this will be automatically set to READ_COMMITTED if the processing.mode has been set to an EOS mode, and the user has not explicitly set deafult.state.isolation.level to READ_UNCOMMITTED. This will provide EOS users with the most useful behaviour out-of-the-box, but ensures that they may choose to sacrifice the benefits of transactionality to ensure that Interactive Queries can read records before they are committed, which is required by a minority of use-cases.

In-memory Transaction Buffers

...

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, we have introduced configurable isolation levels, configured globally via default.state.isolation.level make transactional state stores globally configurable via enable.transactional.statestores (see above).

When operating under the READ_COMMITTED isolation leveltransactional state stores are enabled, the maximum time for records to become visible to interactive queries will be commit.interval.ms. Under EOS, this is by default a low value (100 ms), but under at-least-once, the default is 30 seconds. Users may need to adjust their commit.interval.ms to meet the visibility latency goals for their use-case.

When operating under the READ_UNCOMMITTED isolation level, (i.e. ALOS), all transactional state stores are disabled, which is the defaultt, all written records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

...

With Transactional StateStores,  we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException when operating under EOS the READ_COMMITTED isolation leveltransactional state stores are enabled.


Interactive Query .position Offsets

...

When an interactive query is made under the READ_COMMITTED isolation level the with transactional state stores enabled, the PositionBound will constrain the committed Position map, whereas under READ_UNCOMMITTEDwhen transactional state stores are disabled, the PositionBound will constrain the uncommitted Position map.

RocksDB Transactions

When the isolation level is READ_COMMITTEDtransactional state stores are enabled, we will use RocksDB's WriteBatchWithIndex, which exists specifically for use-cases that wish to implement transactions, as we seek to in Kafka Streams. When reading records from the StreamThread, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. When reading records from Interactive Queries, we will use the regular RocksDB#get and RocksDB#newIterator methods, to ensure we see only records that have been committed (see above). The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the WriteBatch must reside completely in-memory until it is committed, which is addressed by statestore.uncommitted.max.bytes, see above.

...

When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action. By default, transactional state stores will be disabled, resulting in no change in behaviour for users.

Users who wish to benefit from reduced restore times on-error will need to explicitly enable transactional state stores, provided they are not dependent on uncommitted writes being immediately visible to Interactive QueriesUsers that currently use processing.guarantee: exactly-once(-v2|-beta) and who wish to continue to read uncommitted records from their Interactive Queries will need to explicitly set default.state.isolation.level: READ_UNCOMMITTED.

Downgrading

When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. No changes to on-disk formats are made by this KIP, so downgrading to a previous version will not trigger state restore.

...