Child pages
  • KIP-258: Allow to Store Record Timestamps in RocksDB
Skip to end of metadata
Go to start of metadata


Current state"Under Discussion"

Discussion thread[DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB


Key Summary T Created Updated Due Assignee Reporter P Status Resolution

Released: 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


In order to improve the provided stream processing semantics of and to add new feature to Kafka Streams, we want to be able to store record timestamps in KTables. This allows us to address multiple issues like

  • handling out-of-order data for source KTable (related to KAFKA-5533)
  • add TTL for KTables (KAFKA-4212 and KAFKA-4273)
  • return the timestamp of the latest update in Interactive Queries (KAFKA-4304)
  • improve timestamp propagation for DSL operator (KAFKA-6455)


The challenging part of this KIP is to define a smooth upgrade path with the upgraded rocksDB format. There are some initial thoughts on KAFKA-3522 already. The rest of this doc will first focus on the public interface additions for the upgrade path.

Public Interfaces

We need to add a new configuration parameter upgrade.mode that will be null by default and can take two values: "prepare_in_place_upgrade" and "prepare_roll_over_upgrade" with the following semantics:

  • null: no upgrade needed, run with latest formats
  • "in_place": prepare yourself for an upgrade of the rebalance user-metadata format and an in-place upgrade of RocksDB on-disk format
  • "roll_over": prepare yourself for an upgrade of the rebalance user-metadata format and an roll-over upgrade of RocksDB on-disk format

We also suggest to include a fix for in this KIP, to allow upgrading from 0.10.0 to 1.2. We want to introduce a second parameter upgrade.from that can take the following values:

  • "0.10.0.x": for upgrading from 0.10.0.x to 1.2
  • "0.10.1.x", "0.10.2.x", "0.11.0.x", "1.0.x", and "1.1.x": for upgrading from 0.10.1.x,..., 1.1.x to 1.2
  • we add new version if required for later releases

Note, that the above proposal only fixes KAFKA-6054 in 1.2. If we want to have fixes for versions 0.10.1.x, ...,1.1.x for KAFKA-6054, we would need to back port only one of both configuration parameter, namely upgrade.from into those older versions. For this case, upgrade.from only needs to accept two values

  • null: for no upgrade required
  • "0.10.0.x": for upgrading from 0.10.0.x to any version of 0.10.1.x, ..., 1.1.x

As we only upgraded the rebalance metadata in, there is no RocksDB upgrade required and a single upgrade mode is sufficient for this case and thus parameter upgrade.mode is no required. Note, that there would be an dependency between upgrade.from and upgrade.mode: If we include the fix for KAFKA-6054 and also back port the fix to order version, the upgrade.from parameter would be the "main" parameter enabling upgrade behavior. For this case, it's required to set upgrade.mode, too (if available). If upgrade.from is not set, the value of upgrade.mode is ignored and does not have any effect. If we don't include a fix for KAFKA-6054 there will be only one parameter upgrade.mode though – the upgrade steps below are targeted for the case the we include a fix for KAFKA-6054, and that upgrade.from is the main parameter the requires upgrade_mode if set, too. Thus, if upgrade.from=null the second paramater is ignored.

Upgrade fromUpgrade toSet config upgrade.from to

Set config upgrade.mode to

Side remark
0.10.0.x0.10.1.x, ..., 1.1.x"0.10.0.x"
N/Afixed KAFKA-6054 for releases 0.10.1.x, ..., 1.1.x

"in_place" or "roll_over"

fixed KAFKA-6054 for 1.2 release
0.10.1.x, ..., 1.1.x1.2

"0.10.1.x", "0.10.2.x", "0.11.0.x", "1.0.x", or "1.1.x"

"in_place" or "roll_over" 

Details about the different behavior for all types of upgrades are given below.

Proposed Changes

The main change we propose, is to change the value format within RocksDB to contain the record timestamp as a 8-byte (long) prefix; ie, we change the format from <key:value> to <key:timestamp+value>. We need to introduce a new value serde that wrapps the original value serde as well as a long serde. One important details is, that the serde only changes for the store, but not the changelog topic: the underlying changelog topic stores the timestamp in the record metadata timestamp field already. Thus, the new Serde wrapped would only be applied to read/write to RocksDB, but not for the changelog topic that uses the original Serde. In order to know the format of the store, we propose to encode the version of the format in the RocksDB directory name. We currently organize the state directory as follows:

# current state directory


For the new format we add suffix  "-v2": to the store subdirectories:

# we add the sub-directory with suffix '-v2' within rocksdb directory


This layout allows us to pick the correct serde when accessing a store.

To make use of the new timestamp that can be stored, we need to add new interfaces to the existing store interfaces that allow to read/write the timestamp and the value at once.

// new interface
public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
// existing interface
public interface ReadOnlyKeyValueStore<K, V> {
    V get(K key);
    KeyValueIterator<K, V> range(K from, K to);
    KeyValueIterator<K, V> all();
    long approximateNumEntries();
// new interface
public interface ReadOnlyKeyValueWithTimestampStore<K, V> {
    ValueAndTimestamp<V> getWithTimestamp(K key);
    KeyValueIterator<K, ValueAndTimestamp<V>> rangeWithTimestamp(K from, K to);
    KeyValueIterator<K, ValueAndTimestamp<V>> allWithTimestamp();
// existing interface
public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K, V> {
    void put(K key, V value);
    V putIfAbsent(K key, V value);
    void putAll(List<KeyValue<K, V>> entries);
    V delete(K key);

// new interface
public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, V>, ReadOnlyKeyValueTimestampStore<K, V> {
    void putWithTimestamp(K key, V value, long timestamp);
    void putWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp);
    ValueAndTimestamp<V> putIfAbsentWithTimestamp(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsentWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp);
    void putAllWithTimestamp(List<KeyValue<K, ValueAndTimestamp<V>> entries);
    ValueAndTimestamp<V> deleteWithTimestamp(K key);

// existing interface
public interface ReadOnlyWindowStore<K, V> {
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> all();
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
// new interface
public interface ReadOnlyWindowWithTimestampStore<K, V> {
    WindowStoreIterator<ValueAndTimestamp<V>> fetchWithTimestamp(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchWithTimestamp(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> allWithTimestamp();
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchAllWithTimestamp(long timeFrom, long timeTo);

// existing interface
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
    void put(K key, V value);
    void put(K key, V value, long windowTimestamp);
// new interface
public interface WindowWithTimestampStore<K, V> extends WindowStore<K, V>, ReadOnlyWindowTimestampStore<K, V> {
    void putWithTimestamp(K key, V value, long eventTimestamp);
    void putWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp);
    void putWithTimestamp(K key, V value, long eventTimestamp, long windowTimestamp);
    void putWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp, long windowTimestamp);
// existing interface
public interface ReadOnlySessionStore<K, V> {
    KeyValueIterator<Windowed<K>, V> fetch(K key);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to);
// new interface
public interface ReadOnlySessionWithTimestampStore<K, V> {
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchWithTimestamp(K key);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchWithTimestamp(K from, K to, );
// existing interface
public interface SessionStore<K, V> extends StateStore, ReadOnlySessionStore<K, V> {
    KeyValueIterator<Windowed<K>, V> findSessions(K key, long earliestSessionEndTime, long latestSessionStartTime);
    KeyValueIterator<Windowed<K>, V> findSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime);
    void remove(Windowed<K> sessionKey);
    void put(Windowed<K> sessionKey, V aggregate);
// new interface
public interface SessionWithTimestampStore<K, V> extends SessionStore<K, V>, ReadOnlySessionTimestampStore<K, V> {
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> findSessionsWithTimestamp(K key, long earliestSessionEndTime, long latestSessionStartTime);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> findSessionsWithTimestam(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime);
    void putWithTimestamp(Windowed<K> sessionKey, V aggregate, long timestamp);
    void putWithTimestamp(Windowed<K> sessionKey, ValueAndTimestamp<V> valueWithTimestamp);

We extend our existing stores (RocksDB, InMemory) to implement the corresponding new interfaces. Note, we will keep the old stores and extend them to give PAPI users to choice to use stored with or without the abilitiy to store timestamps.


For a clean upgrade path for RocksDB itself, we need to introduce the above configs and also change the user Subscription and Assigment rebalance metadata. For this change we bump the version number from 2 to 3 to distinguesh between old and new instances within a consumer group. New instances will encode a version number for there local stores as well as an "supported version" number for the rebalance protocol. Because of this rebalance metadata change, we suggest to also fix KAFKA-6054 as mentioned above. Note that version 0.10.0.x used metadata version 1, while versions 0.10.1.x, ..., 1.1.x use metadata version 2.

The newly added "supported version" number in the rebalance metadata will allow us to implement a "version probing" step that simplifies future metadata upgrades. We will change the group leader code to not fail if it receives a Subscription with larger version as it supports: instead, the leades will send an empty Assignment to the corresponding consumer including the leaders own "supported version". This allows the client to do a unsubscribe()/subscribe()/poll() pattern to trigger a second rebalance—this time, the client know the supported version of the leader and can send a correspondlingly downgrades Subscription. The upgrade path to 1.2 will not be simplified by this change, but we want to add it to make future upgrade beyond 1.2 simpler.

Compatibility, Deprecation, and Migration Plan

The storage format change requires application to upgrade correctly. We plan to offer two upgrade pathes.

  • In-place upgrade: this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version (not 0.10.0.x) to 1.2; Kafka Streams need to be configured with upgrade.from="<old.version>" and upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. config "upgrade.from" tells the application to send Subscription in version 2 (to be compatible with potential version 2 leader in the group)
        2. instances will receive a corresponding version 2 Assigmentin in this stage
      3. because upgrade mode is "in place", each instance will create a "back groud" task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. when all stores are ready, we print a INFO message in the log (thus logs must be watched)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.from must be removed for new startup (this implies that upgrade.mode is ignored)
      5. do a second rolling bounce for each instance to get new config
        1. bounced instance can send a version 3 Subscription; they encode each store twice (once with storage format 1 and once with storage format 2)
        2. the leader sends version 2 Assigment (based on reported old stores with format 1) back to indicate that upgrade is not completed yet as long as at least one version 2 Subsription is received
        3. as long as version 2 Assigment is received, the old store will be used for processing and the new store will be futher updated in the back ground
        4. if the leader receives only version 3 Subscirption, it send version 3 Assigment (based on reported new stores with format 2) back indicating that the upgrad is completed
        5. when Kafka Streams receives a version 3 Subscription is check for local RocksDB directories in old format 1 and deletes them to free disk space
    • upgrading from 0.10.0.x to 1.2 uses the same upgrade pattern with config upgrade.from="0.10.0.x" and upgrade.mode="in_place"
      • instead of old Subscription and Assigmentmet metadata verions 2, metadata version 1 is used
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (the all form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version (not 0.10.0.x) to 1.2 for all existing (old) instances; Kafka Streams need to be configured with upgrade.from="<old.version>" and upgrade.mode="roll_over" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. config "upgrade.from" tells the application to send Subscription in version 2 (to be compatible with potential version 2 leader in the group)
        2. instances will receive a version 2 Assigmentin in this stage
        3. all old instances will just resume processing as usual
        4. becuase upgrade mode if "roll over" no backgroud tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.from removed
        1. the new instances send Subscription with version 3
        2. the leader can distinguesh between old and new instances based on the used Subscription version and encoded information
          1. note, for the in-place upgrade case, new instances will report old and new store format for all their stores (as long as upgrade is not completed)
          2. new roll-over nodes, will only report new format stores (and never both)
          3. thus, the leader receives mixed version 2 and version 3 meta data, it can figure out if this is a in-place or rolling upgrade (if a 1.2 leader only receives version 2 metadata it cannot distingues between both upgrade modes, but there is also no need to disntinguesh both for this case as the leader behaves exacty the same for both upgrad protocols in this phase)
        3. the leader sends a version 2 Assignment to the old instances and "mirrors" the assignment for the new instances
          1. in the "mirror" assignment, aktive tasks are replaced by standby tasks and it's send as version 3 Assignment
          2. because the leader only assigns standby taks to the new instances, they only start to build up the stores in new storage format
          3. we need to report the progress of the StandbyTasks and write and INFO log message if they are ready to take over
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives version 3 Subscriptions and computes are version 3 Assigment base on the prepared stores and upgrade is finished
    • upgrading from 0.10.0.x to 1.2 uses the same upgrage pattern with config upgrade.from="0.10.0.x" and upgrade.mode="roll_over" must be used
      • instead of old Subscription and Assigmentmet metadata verions 2, metadata version 1 is used
  • Offline upgrade:
    • this is the simplest form of upgrading but results in application downtime
    • stop all running instances
    • swap the jar and restart all instances (config upgrade.from is not set)
      • the new instances will send and receive version 3 Subscription and Assignments
      • the will not find any store in new format and just recorver the stores using the new format from the changelog topic
      • as the receive version 3 Assignemnt they delete local RocksDB directories with old format
  • Upgrading from 0.10.0.x to,,, 1.0.2, 1.1.1 (note, those upgrade target versions are not released yet)
    • upgrade flow:
      1. prepare a jar hot swap from 0.10.0.x to <new.version>; Kafka Streams need to be configured with upgrade.from="0.10.0.x" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. config "0.10.0.x" tells the application to send Subscription in version 1 (to be compatible with potential version 1 leader in the group)
        2. instances will receive a version 1 Assigmentin in this stage
      3. do a second rolling bounce for each instance with config parameter upgrade.from removed
        1. bounced instance can send a version 2 Subscription
        2. the leader sends version 1 Assigment back as long as at least one version 1 Subsription is received
        3. if the leader receives only version 2 Subscirption, it send version 2 Assigment back


Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade
    • this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes

Rejected Alternatives

  • change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
    • storage amplification for local stores
    • record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
    • code needs to check all versions all the time for future release: increases code complexity and runtime overhead
    • it's hard to change the key format
      • for value format, the version number can be a magic prefix byte
      • for key lookup, we would need to know the magic byte in advance for efficient point queries into RocksDB; if multiple versions exist in parallel, this is difficult (either do multiple queries with different versions bytes until entry is found or all versions are tried implying does not exist – or use range queries but those are very expensive)
  • encode the storage format in the directory name not at "store version number" but at "AK release number"
    • might be confusion to user if store format does not change ("I am running Kafka 1.4, but the store indicates it's running on 1.2").
  • use a simpler upgrade path without any configs or complex rolling bounce scenarios
    • requires application down-time for upgrading to new format
  • use consumer's built-in protocal upgrade mechanism (ie, register multple "assigment strategies")
    • has the disadvantage that we need to implement two StreamsPartitionAssingor classes
    • increased network traffic during rebalance
    • encoding "supported version" in metadata subsumes this approach for future releases anyway
    • if we want to "disable" the old protocol, a second rebalance is required, too
    • cannot avoid a second rebalance that this required for state store upgrade
  • only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
    • might be prohibitive if not enough disk space is available
  • allow users to stay with old format: upgrade would be simpler as it's only one rolling bounce
    • unclear default behavior: should we stay on 1.1 format by default or should we use 1.2 format by default?
      • if 1.1 is default, upgrade is simple, but if one write a new application, users must turn on 1.2 format explicitly
      • if 1.2 is default, simple upgrade requires a config that tells Streams to stay with 1.1 format
      • conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
    • if no upgrade happens, new features (as listed above) would be useless
  • Only prepare stores for active task (but not standby tasks)
    • this would reduce the disk footprint during upgrade
    • disadvantage: when switch to new version happens, there are not hot standby available for some time
    • we could make it configurable, however, we try to keep the number of configs small; also, it's already complex enough and adding more options would make it worse
    • it's not an issue for roll-over upgrade and not everybody configures Standbys in the frist place
      • people with standbys are willing to provide more disk, so it seem a fair assumption that they are fine with roll-over upgrade, too
  • Don't fix KAFKA-6054
    • it's a simple fix to include: just add two more accepted values to parameter upgrade.mode
    • it's s fair question, how many people will run with Streams 0.10.0 – note those, that if people are "stuck" with 0.10.0 broker, they can use 0.10.1 or newer as it's not backwards compatible to 0.10.0 – thus, might be more than expected
  • Fix KAFKA-6054 only for 1.2 release
    • it's a relativley simply fix for older releases (main desing work is already coverd and writing the code is not to complex becuase it's only the rebalance metadata version change)
    • upgrade path is also way simpler
    • it's unclear though if we will have bug-fix releases for older versions; thus nobody might ever be able to get this code (if they don't build from corresponding dev-branches themselves)


  • No labels

1 Comment

  1. A few comments / questions:

    1. For the value wrapper serde, it is not going to be used for the sink topic message format as well as the changelog topic right? For example, say {{"topic")}}, will the "topic" contain records with the timestamp prefixed format?

    2. For the new config upgrade.mode, if we are going to use this config for a similar upgrade path for other format changes like rocksDB, changelog, and rebalance protocol etc in the future, then we would need new values whenever we do that. For example: say in 1.3 we changed the rocksDB format again, and then for people upgrading from 1.1 to 1.3 we would need a new value like "in_place_1.1.x" and for those upgrading from 1.2 to 1.3 we need "in_place_1.2.x" etc. This is a bit awkward and I feel we'd better have two configs. One as upgrade.mode only for the upgrade mode itself, and another config like the "" in Kafka broker, indicating which version to use during the rebalance phase, and default to none indicating the newest known version. So in the second rebalance users would change both configs, setting "none" for the first, and empty for the second indicating the newest version.

      Note, this would open the door for users to delay upgrade (i.e. do not set "" to newest version right away), or to upgrade in multiple phases (e.g. after swapping the jar of 1.2, first upgrade from 0.10.0 to 0.10.1, and then to 0.10.1 to 1.2 etc).


    3. Related to 2) above, should we consider using the same value for rocksDB versions, for example "rocksdb-1.2" with "rocksdb" indicating oldest version for users to understand easily. By doing this we need to maintain the map of the Kafka version to the rocksDB version and the rebalance protocol version, etc.