Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Improve downgrade behaviour when using upgrade.from

...

Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint and/or .position files.

Downgrade

When downgrading from a version RocksDBStore will not be compatible with older versions of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. The older Kafka Streams version will be unable to open any RocksDB stores that were upgraded to store offsets (see Upgrading), which will cause Kafka Streams to wipe the state for those Tasks and restore the state, using an older RocksDB store format, from the changelogs.Since downgrading is a low frequency event, and since restoring state from scratch is already an existing failure mode for older versions of Kafka Streams, we deem this an acceptable automatic downgrade strategy, due to the extra offsets column family that older versions of Kafka Streams do not recognize. If a user downgrades from a version containing this KIP, to a version without it, by default the on-disk state for any Task containing a RocksDBStore will be wiped and restored from their changelogs.

However, we will support using the upgrade.from config to safely downgrade without losing local RocksDB state. When upgrade.from  is set to a version less than the version that introduced this KIP:

  • On RocksDBStore#close:
    • All changelogs stored in the RocksDB offsets column family will be written to the Task .checkpoint file.
    • All source topic offsets stored in the RocksDB offsets column family will be written to the store .position file.
    • The RocksDB offsets column family will be deleted from the RocksDB database, ensuring that older versions of Kafka Streams will be able to open the database successfully.

Test Plan

The existing tests that depend on .checkpoint files will either be removed (if they are no longer relevant), or modified to instead call StateStore#committedOffset().

...