Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.state;

// new interfaces

public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
}


public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {
    void put(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsent(K key, V value, long timestamp);
}

// TODO: add missing WindowWithTimestampStore and SessionWithTimestampStore

// extend existing classes (omitting existing method)

public final class Stores {
    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueWithTimestampStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                          final Serde<K> keySerde,
                                                                                                          final Serde<V> valueSerde);

    public static <K, V> StoreBuilder<KeyValueWithTimestampStore<K, V>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder(final KeyValueBytesStoreSupplier supplier,
                                                                                                                           final Serde<K> keySerde,
                                                                                                                           final Serde<V> valueSerde);

    // TODO: add missing xxxStoreBuilder() methods for WindowWithTimestampStore and SessionWithTimestampStore
}

public final class QueryableStoreTypes {
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();
}

We extend our existing stores (RocksDB, InMemory) to implement the corresponding new interfaces. Note, we will keep the old stores and extend them to give PAPI users to choice to use stored with or without the ability to store timestamps.

The usage/implementation of upgrade stores is describe in the next section.

Upgrading

// verify if we need this, or if we can reuse keyValueStore() directly with V==ValueAndTimestamp<V>
    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>><K, V>> keyValueWithTimestampStore();

    // TODO add missing methods for WindowWithTimestampStore and SessionWithTimestampStore (if required)
}

We extend our existing stores (RocksDB, InMemory) to implement the corresponding new interfaces. Note, we will keep the old stores and extend them to give PAPI users to choice to use stored with or without the ability to store timestamps.

The usage/implementation of upgrade stores is describe in the next section.

Upgrading

For a clean upgrade path for RocksDB itself, we need to introduce the above configs and new interfaces to implement actual upgrade code. We will provide implementations to upgrade from an existing KeyValueStore to and KeyValueWithTimestampStore. The defined interface are For a clean upgrade path for RocksDB itself, we need to introduce the above configs and new interfaces to implement actual upgrade code. We will provide implementations to upgrade from an existing KeyValueStore to and KeyValueWithTimestampStore. The defined interface are generic though, allowing to implement "upgrade stores" from any store type A to and other store type B.

...

  • In-place upgrade (online): this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
      3. because upgrade mode is "in place", each instance will create a "store upgrade" task to each assigned active and standby task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. during restore, users need to observe restore progress (as actual processing resumes, restore will never the "finished", and it's up to the user when to trigger the second rolling bound)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. if prepare task directories are detected, we check if upgrade is finished and if not finish the upgrade (ie, read the latest delta from the changelog), the corresponding active directories are replace by the prepare task directories
        2. afterwards regular processing begins
  • In-place upgrade (offline): required single rolling bounce of each application instance
    • advantage: simplest way to upgrade (only one rolling bounce)
    • disadvantage: needs 2x local disk storage during upgrade; application is offline during store upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version; Kafka Streams configured does not need to be changed (ie, upgrade.mode=null for startup)
      2. do a rolling bounce to get the new jar in place for each instance
      3. because upgrade mode is disabled, each instance only find the old store format but no new store (thus it behave exactly the same as in second rolling bounce of "in_plase" online upgrade)
        1. create prepare task directory and reply the "latest delta" from the changelog topic (it happens, that this delta is the complete changelog topic)
        2. after restore finished (no processing during this time), old store is replaces with new store
        3. afterwards regular processing begins
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (old and new form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; old Kafka Streams need to be configured with upgrade.mode=null for startup
      2. do a rolling bounce to get the new jar for each old instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode is not enabled no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over" instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives subscriptions that encode "roll over" and thus the leader knows that the upgrade is completed

Upgrade scenarios:

Processor API:

  • using KeyValueStore and does not want to keep it
    • nothing to do; regular single rolling bounce upgrade
  • using KeyValueStore and want to upgrade to KeyValueWithTimestamp store
    • update code to use new store and provide corresponding `StoreUpgradeBuilder`
    • follow instructions as described above

DSL users (we will do another KIP to use RocksDB because we need to add new API and deprecate existing API to switch the store type):

    • with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version for all existing (old) instances; old Kafka Streams need to be configured with upgrade.mode=null for startup
      2. do a rolling bounce to get the new jar for each old instance
        1. all old instances will just resume processing as usual
        2. because upgrade mode is not enabled no store upgrade tasks are started (cf. "In-place" upgrade above)
      3. users starts N new instances with config parameter upgrade.mode="roll_over"
        1. The `roll_over_new` config will be encoded in the subscription metadata
        2. the leader can distinguish between old and new instances based on the used Subscription encoded information and assign tasks (active and restore) for restore to the "roll over" instances (ie, the assignment is "mirrored" to all "roll over" instances)
        3. "roll over" instances only create StoreUpgradeTasks and perform the restore
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives subscriptions that encode "roll over" and thus the leader knows that the upgrade is completed

Upgrade scenarios:

Processor API:

  • using KeyValueStore/WindowStore/SessionStore and wants to keep it
    • nothing to do; regular single rolling bounce upgrade
  • using KeyValueStore/WindowStore/SesssionStore and want to upgrade to XxxWithTimestampStore
    • update code to use new store and provide corresponding `StoreUpgradeBuilder`
    • follow instructions as described above

DSL users (after KAFKA-6521 is implemented):

  • by default, users should consider the three available upgrade path
    • if they don't pay attention, they would end up with an in-place offline upgrade
  • if users overwrite all used stores to in-memory, an online in-place upgrade does not make sense
    • we recommend an in-place offline upgrade (we think this is fine, as users are fine with larger restore time anyway, as they use in-memory stores)
    • roll-over upgrade would also be possible
  • users running with default stores only
    • no code change required
    • all three upgrade modes as describe above possible – user must set config accordingly
  • users replacing all RocksDBs with In-Memory stores
    • code change to In-Memory-with-Timestamp store recommended
    • to not break compatibility, we would also support an upgrade without code change (for this case, the current/old behavior would be preserved, and an upgrade is effectively suppressed)
    • single rolling bounce upgrade with not upgrade config sufficient
    • roll-over upgrade possible
  • users providing `Materialized` parameters for RocksDB (note, if we leverage RocksDB with timestamps, the API changes from KeyValueStore to KeyValueWithTimestampStore)
    • code change to XXX-with-Timestamp store recommended
    • to not break compatibility, we would also support an upgrade without code change (for this case, the current/old behavior would be preserved, and an upgrade is effectively suppressed)
    • if no code change is made, single rolling bounce upgrade with not upgrade config sufficient
    • if code is changes, all three upgrade modes are available as described above
  • note: users with "mixed" code usage, that have at least one default store should consider to follow a two-bounce upgrade
    • if not, in worst case the perform an in-place offline upgrade

Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade
    • this should include "simulated upgrades" to metadata version 4, to ensure that the implementation work correct for future changes

...