Child pages
  • KIP-258: Allow to Store Record Timestamps in RocksDB
Skip to end of metadata
Go to start of metadata


Current state"Under Discussion"

Discussion thread[DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

JIRA KAFKA-3522 - Getting issue details... STATUS

Released: 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


In order to improve the provided stream processing semantics of – and to add new feature to – Kafka Streams, we want to be able to store record timestamps in KTables. This allows us to address multiple issues like

The challenging part of this KIP is to define a smooth upgrade path with the upgraded RocksDB format. There are some initial thoughts on KAFKA-3522 already. The rest of this doc will first focus on the public interface additions for the upgrade path. Storing timestamps in RocksDB is just a special case though, and we propose to allow for a generic upgrade path from any storage format A to any other storage format B. This include local storage in RocksDB as well as the underlying changelog topic.

Public Interfaces

We will add a new configuration parameter upgrade.mode that will be null by default and can take three values: "in_place" and "roll_over" with the following semantics:

  • null: no upgrade needed, run with latest formats
  • "in_place": prepare an in-place "standby" RocksDB with new format
  • "roll_over": prepare an roll-over "standby" RocksDB with new format

We add a new store type KeyValueWithTimstampStore that extends the exiting KeyValueStore.

We generalize the translation from a changelog ConsumerRecord into an store KeyValue pair using a new interface RecordConverter – there will be a default implementation that does a 1:1 mapping from key to key and value to value. For the new KeyValueWithTimestampStore, we implement a mapping from key to key and from `value plus timestamp` to value.

We introduce interface RecordConverterStore that allows to translate from old storage format to new storage format.

We introduce interface StoreUpgradeBuilder that extends StoreBuilder and can return a "store proxy" that maps from the new store API to an internally used old store and that can return a RecordConverteStore to the actual store upgrade. Thus, StoreUpgradeBuilder can return proxy, upgrade, and new store.

Proposed Changes

To make use of the new timestamp that can be stored, we need to add new interfaces to the existing store interfaces that allow to read/write the timestamp and the value at once.

package org.apache.kafka.streams.state;

// new interfaces

public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();

public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);

// TODO: add missing WindowWithTimestampStore and SessionWithTimestampStore

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                          final Serde<K> keySerde,
                                                                                                          final Serde<V> valueSerde);

    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                                           final Serde<K> keySerde,
                                                                                                                           final Serde<V> valueSerde);

    // TODO: add missing xxxStoreBuilder() methods for WindowWithTimestampStore and SessionWithTimestampStore

public final class QueryableStoreTypes {
    // verify if we need this, or if we can reuse keyValueStore() directly with V==ValueAndTimestamp<V>
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();

    // TODO add missing methods for WindowWithTimestampStore and SessionWithTimestampStore (if required)

We extend our existing stores (RocksDB, InMemory) to implement the corresponding new interfaces. Note, we will keep the old stores and extend them to give PAPI users to choice to use stored with or without the ability to store timestamps.

The usage/implementation of upgrade stores is describe in the next section.


For a clean upgrade path for RocksDB itself, we need to introduce the above configs and new interfaces to implement actual upgrade code. We will provide implementations to upgrade from an existing KeyValueStore to and KeyValueWithTimestampStore. The defined interface are generic though, allowing to implement "upgrade stores" from any store type A to and other store type B.

To make the upgrade possible, we generalize the changelog-to-store mapping from ConsumerRecords to KeyValue pairs – this allows to customize the translation during store upgrade. To upgrade a store, a user updates her topology to use a `StoreUpgradeBuilder` that can return a proxy store, an upgrade store, and a regular store. If the upgrade mode is enabled, the runtime will create a proxy store for StreamTasks that implements the new store API but internally maps back to the old store (ie old on disk format). Using the proxy store allows to run new code with the already existing old store to avoid any downtime. At the same time, runtime will create StoreUpgradeTasks that use `RecordConverterStore`. For those StoreUpgradeTasks, the changelog topic will be consumer, record format will be update to the new format using `RecordConverter` interface; afterwards, the new format is written to (new) RocksDB (and back to the changelog topic if required). Because the changelog topic may contain data with different format, we encode the format in the record header and enhance all changelog readers to check the header for the correct version. If upgrade mode is disabled, `StoreUpgradeBuilder` is used as a regular store builder returning the new store – the difference to use the a builder or the new store directly is two fold: (1) using StoreUpgradeBuilder we can perform a final check if the upgrade finished. If not, we can first create no active tasks, but only StoreUpgradeTasks using an upgrade store. Only after the upgrade is finished, StoreUpgradeTasks are destroyed and regular task using the new stores are created. (2) After a successful update, users don't need to rewrite their code (of course, they still can rewrite the code after a successful upgrade and replace `StoreUpgradeBuilder` with a store builder for the new store only).

As mentioned, we generalize the changelog-to-store mapping from CosumerRecords to KeyValue pairs. If a store (like KeyValueWithTimestampStore) requires a non-default mapping, the corresponding `StateRestoreCallback` must implement `RecordConverter`, too.

package org.apache.kafka.streams.processor;

 * {@code RecordConverter} translates a {@link ConsumerRecord} into a serialized value.
public interface RecordConverter {

     * Convert a given record into a value for local storage.
     * @param record the consumer record
     * @return the value for local storage
    byte[] convert(final ConsumerRecord<byte[], byte[]> record);


package org.apache.kafka.streams.processor;

 * {@code RecordConverterStore} translates a changelog record from old store format to new store format.
public interface RecordConverterStore extends StateStore, RecordConverter {

package org.apache.kafka.streams.processor;

 * {@code StoreUpgradeBuilder} that provides a store as well as corresponding proxy store and converter store.
 * <p>
 * The proxy store maps from a new store API to an old store API.
 * It is responsible to map from the new API calls to the old API during store upgrade phase.
 * <p>
 * The converter store is a store for the new format.
 * Additionally, it implements a {@link RecordConverter} that is used to translate from the old storage format to the new storage format.
 * @param <S> store type (proxy store must implement this store type, too)
 * @param <C> new store type that additionally implements {@link RecordConverter}
public interface StoreUpgradeBuilder<S extends StateStore, C extends RecordConverterStore> extends StoreBuilder<S> {

     * Return a new instance of the proxy store.
     * @return a new instance of the proxy store
    S storeProxy();

     * Return a new instance of the converter store
     * @return a new instance of converter store
    C converterStore();

For the actual RocksDB storage format for KeyValueWithTimestampStore, we add the record timestamp as a 8-byte (long) prefix to the value; ie, we change the format from <key:value> to <key:timestamp+value>. We need to introduce a new value serde that wraps the original value serde as well as a long serde. One important details is, that the serde only changes for the store, but not the changelog topic: the underlying changelog topic stores the timestamp in the record metadata timestamp field already. We need to intercept the write to the changelog topic accordingly.

For the actual upgrade, that might be "in place" we need to make sure to use different directories. Thus, StoreUpgradTasks create store directories with suffix _prepare, ie, for each active task with task directory `X_Y` a StoreUpgradeTask will use task directory `X_Y_prepare`. The directory isolation at task level ensures that we can reuse the same internal store directory structure for active and store-upgrade tasks. After the stores are restored, in a second rebalance, the old task directory will be renamed, the "prepare" directory will be renamed to act as new active task directory, and finally we delete the renamed original task directory to free up the disk space.

Compatibility, Deprecation, and Migration Plan

The storage format change requires application to upgrade correctly. We plan to offer two upgrade pathes.

  • In-place upgrade (online): this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
      3. because upgrade mode is "in place", each instance will create a "store upgrade" task to each assigned active and standby task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. during restore, users need to observe restore progress (as actual processing resumes, restore will never the "finished", and it's up to the user when to trigger the second rolling bound)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. if prepare task directories are detected, we check if upgrade is finished and if not finish the upgrade (ie, read the latest delta from the changelog), the corresponding active directories are replace by the prepare task directories
        2. afterwards regular processing begins
  • In-place upgrade (offline): required single rolling bounce of each application instance
    • advantage: simplest way to upgrade (only one rolling bounce)
    • disadvantage: needs 2x local disk storage during upgrade; application is offline during store upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams configured does not need to be changed (ie, upgrade.mode=null for startup)
      2. do a rolling bounce to get the new jar in place for each instance
      3. because upgrade mode is disabled, each instance only find the old store format but no new store (thus it behave exactly the same as in second rolling bounce of "in_plase" online upgrade)
        1. create prepare task directory and reply the "latest delta" from the changelog topic (it happens, that this delta is the complete changelog topic)
        2. after restore finished (no processing during this time), old store is replaces with new store
        3. afterwards regular processing begins
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (old and new form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; old Kafka Streams need to be configured with upgrade.mode=null for startup
      2. do a rolling bounce to get the new jar for each old instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode is not enabled no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over" instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives subscriptions that encode "roll over" and thus the leader knows that the upgrade is completed

Upgrade scenarios:

Processor API:

  • using KeyValueStore/WindowStore/SessionStore and wants to keep it
    • nothing to do; regular single rolling bounce upgrade
  • using KeyValueStore/WindowStore/SesssionStore and want to upgrade to XxxWithTimestampStore
    • update code to use new store and provide corresponding `StoreUpgradeBuilder`
    • follow instructions as described above

DSL users (after KAFKA-6521 is implemented):

  • by default, users should consider the three available upgrade path
    • if they don't pay attention, they would end up with an in-place offline upgrade
  • if users overwrite all used stores to in-memory, an online in-place upgrade does not make sense
    • we recommend an in-place offline upgrade (we think this is fine, as users are fine with larger restore time anyway, as they use in-memory stores)
    • roll-over upgrade would also be possible

Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade
    • this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes

Rejected Alternatives

  • change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
    • storage amplification for local stores
    • record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
    • code needs to check all versions all the time for future release: increases code complexity and runtime overhead
    • it's hard to change the key format
      • for value format, the version number can be a magic prefix byte
      • for key lookup, we would need to know the magic byte in advance for efficient point queries into RocksDB; if multiple versions exist in parallel, this is difficult (either do multiple queries with different versions bytes until entry is found or all versions are tried implying does not exist – or use range queries but those are very expensive)
  • encode the storage format in the directory name not at "store version number" but at "AK release number"
    • might be confusion to user if store format does not change ("I am running Kafka 1.4, but the store indicates it's running on 1.2").
  • use a simpler upgrade path without any configs or complex rolling bounce scenarios
    • requires application down-time for upgrading to new format
  • use consumer's built-in protocol upgrade mechanism (ie, register multiple "assignment strategies")
    • has the disadvantage that we need to implement two StreamsPartitionAssingor classes
    • increased network traffic during rebalance
    • encoding "supported version" in metadata subsumes this approach for future releases anyway
    • if we want to "disable" the old protocol, a second rebalance is required, too
    • cannot avoid a second rebalance that this required for state store upgrade
  • only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
    • might be prohibitive if not enough disk space is available
  • allow DSL users to stay with old format: upgrade would be simpler as it's only one rolling bounce
    • unclear default behavior: should we stay on 1.1 format by default or should we use 1.2 format by default?
      • if 1.1 is default, upgrade is simple, but if one write a new application, users must turn on 1.2 format explicitly
      • if 1.2 is default, simple upgrade requires a config that tells Streams to stay with 1.1 format
      • conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
    • if no upgrade happens, new features (as listed above) would be useless
  • Only prepare stores for active task (but not standby tasks)
    • this would reduce the disk footprint during upgrade
    • disadvantage: when switch to new version happens, there are not hot standby available for some time
    • we could make it configurable, however, we try to keep the number of configs small; also, it's already complex enough and adding more options would make it worse
    • it's not an issue for roll-over upgrade and not everybody configures Standbys in the frist place
      • people with standbys are willing to provide more disk, so it seem a fair assumption that they are fine with roll-over upgrade, too


  • No labels

1 Comment

  1. A few comments / questions:

    1. For the value wrapper serde, it is not going to be used for the sink topic message format as well as the changelog topic right? For example, say {{"topic")}}, will the "topic" contain records with the timestamp prefixed format?

    2. For the new config upgrade.mode, if we are going to use this config for a similar upgrade path for other format changes like rocksDB, changelog, and rebalance protocol etc in the future, then we would need new values whenever we do that. For example: say in 1.3 we changed the rocksDB format again, and then for people upgrading from 1.1 to 1.3 we would need a new value like "in_place_1.1.x" and for those upgrading from 1.2 to 1.3 we need "in_place_1.2.x" etc. This is a bit awkward and I feel we'd better have two configs. One as upgrade.mode only for the upgrade mode itself, and another config like the "" in Kafka broker, indicating which version to use during the rebalance phase, and default to none indicating the newest known version. So in the second rebalance users would change both configs, setting "none" for the first, and empty for the second indicating the newest version.

      Note, this would open the door for users to delay upgrade (i.e. do not set "" to newest version right away), or to upgrade in multiple phases (e.g. after swapping the jar of 1.2, first upgrade from 0.10.0 to 0.10.1, and then to 0.10.1 to 1.2 etc).


    3. Related to 2) above, should we consider using the same value for rocksDB versions, for example "rocksdb-1.2" with "rocksdb" indicating oldest version for users to understand easily. By doing this we need to maintain the map of the Kafka version to the rocksDB version and the rebalance protocol version, etc.