You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state"Under Discussion"

Discussion thread: TODO

JIRA

Key Summary T Created Updated Due Assignee Reporter P Status Resolution
Loading...
Refresh

Released: 1.2

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to improve the provided stream processing semantics of and to add new feature to Kafka Streams, we want to be able to store record timestamps in KTables. This allows us to address multiple issues like

  • handling out-of-order data for source KTable (related to KAFKA-5533)
  • add TTL for KTables (KAFKA-4212 and KAFKA-4273)
  • return the timestamp of the latest update in Interactive Queries (KAFKA-4304)
  • improve timestamp propagation for DSL operator (KAFKA-6455)

 

The challenging part of this KIP is to define a smooth upgrade path with the upgraded rocksDB format. There are some initial thoughts on KAFKA-3522 already. The rest of this doc will first focus on the public interface additions for the upgrade path.

Public Interfaces

We need to add a new configuration parameter upgrade.mode that will be null by default and can take two values: "prepare_in_place_upgrade" and "prepare_roll_over_upgrade" with the following semantics:

  • null: no upgrade needed, run with latest formats
  • "in_place": prepare yourself for an upgrade of the rebalance user-metadata format and an in-place upgrade of RocksDB on-disk format
  • "roll_over": prepare yourself for an upgrade of the rebalance user-metadata format and an roll-over upgrade of RocksDB on-disk format

If we consider to include a fix for https://issues.apache.org/jira/browse/KAFKA-6054 in this KIP, to allow upgrading from 0.10.0 to 1.2, we need to allow two more values for config upgrade.mode:

  • "in_place_0.10.0.x"
  • "roll_over_0.10.0.x"

Both configs are basically the same as the two from above, but must be used for upgrading 0.10.0.x to 1.2.x. For upgrading from any of 0.10.1.x, ..., 1.1.x the two options from above would be used.

Note, that the above proposal only fixes KAFKA-6054 in 1.2. If we want to have fixes for versions 0.10.1.x, ...,1.1.x for KAFKA-6054, we would need to pack port the configuration parameter upgrade.mode into those older versions. For this case, upgrade.mode only needs to accept two values

  • null: for no upgrade required
  • "0.10.0.x": for upgrading from 0.10.0.x to any version of 0.10.1.x, ..., 1.1.x

As we only upgraded the rebalance metadata in 0.10.1.0, there is no RocksDB upgrade required and a single upgrade mode is sufficient for this case.

Upgrade fromUpgrade to

Set config upgrade.mode to

Side remark
0.10.0.x0.10.1.x, ..., 1.1.x"0.10.0.x"fixed KAFKA-6054 for releases 0.10.1.x, ..., 1.1.x
0.10.0.x1.2

"in_place_0.10.0.x" or "roll_over_0.10.0.x"

fixed KAFKA-6054 for 1.2 release
0.10.1.x, ..., 1.1.x1.2"in_place" or "roll_over"does not fix KAFKA-6054

Details about the different behavior for all types of upgrades are given below.

Proposed Changes

The main change we propose, is to change the value format within RocksDB to contain the record timestamp as a 8-byte (long) prefix; ie, we change the format from <key:value> to <key:timestamp+value>. We need to introduce a new value serde that wrapps the original value serde as well as a long serde. One important details is, that the serde only changes for the store, but not the changelog topic: the underlying changelog topic stores the timestamp in the record metadata timestamp field already. In order to know the format of the store, we propose to encode the version of the format in the RocksDB directory name. We currently organize the state directory as follows:

# current state directory

/<state.dir>/<application.id>/<task.id>/.checkpoint
/<state.dir>/<application.id>/<task.id>/.lock
/<state.dir>/<application.id>/<task.id>/storeName1/
/<state.dir>/<application.id>/<task.id>/storeName2/
/<state.dir>/<application.id>/<task.id>/rocksdb/storeName3/
/<state.dir>/<application.id>/<task.id>/rocksdb/storeName4/

For the new format we add directory rocksdb-v2:

# we add the sub-directory 'rocksdb-v2' and nest store directories there


/<state.dir>/<application.id>/<task.id>/rocksdb-v2/storeName3/
/<state.dir>/<application.id>/<task.id>/rocksdb-v2/storeName4/

This layout allows us to pick the correct serde when accessing a store.

To make use of the new timestamp that can be stored, we need to add new interfaces to the existing store interfaces that allow to read/write the timestamp and the value at once.

// new interface
public interface ValueAndTimestamp<V> {
    V value();
    long timestamp();
}
 
// existing interface
public interface ReadOnlyKeyValueStore<K, V> {
    V get(K key);
    KeyValueIterator<K, V> range(K from, K to);
    KeyValueIterator<K, V> all();
    long approximateNumEntries();
}
 
// new interface
public interface ReadOnlyKeyValueTimestampStore<K, V> {
    ValueAndTimestamp<V> getWithTimestamp(K key);
    KeyValueIterator<K, ValueAndTimestamp<V>> rangeWithTimestamp(K from, K to);
    KeyValueIterator<K, ValueAndTimestamp<V>> allWithTimestamp();
}
 
// existing interface
public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K, V> {
    void put(K key, V value);
    V putIfAbsent(K key, V value);
    void putAll(List<KeyValue<K, V>> entries);
    V delete(K key);
}


// new interface
public interface KeyValueTimestampStore<K, V> extends KeyValueStore<K, V>, ReadOnlyKeyValueTimestampStore<K, V> {
    void putWithTimestamp(K key, V value, long timestamp);
    void putWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp);
    ValueAndTimestamp<V> putIfAbsentWithTimestamp(K key, V value, long timestamp);
    ValueAndTimestamp<V> putIfAbsentWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp);
    void putAllWithTimestamp(List<KeyValue<K, ValueAndTimestamp<V>> entries);
    ValueAndTimestamp<V> deleteWithTimestamp(K key);
}


// existing interface
public interface ReadOnlyWindowStore<K, V> {
    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, V> all();
    KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
}
 
// new interface
public interface ReadOnlyWindowTimestampStore<K, V> {
    WindowStoreIterator<ValueAndTimestamp<V>> fetchWithTimestamp(K key, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchWithTimestamp(K from, K to, long timeFrom, long timeTo);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> allWithTimestamp();
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchAllWithTimestamp(long timeFrom, long timeTo);
}


// existing interface
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
    void put(K key, V value);
    void put(K key, V value, long windowTimestamp);
}
 
// new interface
public interface WindowTimestampStore<K, V> extends WindowStore<K, V>, ReadOnlyWindowTimestampStore<K, V> {
    void putWithTimestamp(K key, V value, long eventTimestamp);
    void putWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp);
    void putWithTimestamp(K key, V value, long eventTimestamp, long windowTimestamp);
    void putWithTimestamp(K key, ValueAndTimestamp<V> valueWithTimestamp, long windowTimestamp);
}
 
// existing interface
public interface ReadOnlySessionStore<K, V> {
    KeyValueIterator<Windowed<K>, V> fetch(K key);
    KeyValueIterator<Windowed<K>, V> fetch(K from, K to);
}
 
// new interface
public interface ReadOnlySessionTimestampStore<K, V> {
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchWithTimestamp(K key);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchWithTimestamp(K from, K to, );
}
 
// existing interface
public interface SessionStore<K, V> extends StateStore, ReadOnlySessionStore<K, V> {
    KeyValueIterator<Windowed<K>, V> findSessions(K key, long earliestSessionEndTime, long latestSessionStartTime);
    KeyValueIterator<Windowed<K>, V> findSessions(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime);
    void remove(Windowed<K> sessionKey);
    void put(Windowed<K> sessionKey, V aggregate);
}
 
// new interface
public interface SessionTimestampStore<K, V> extends SessionStore<K, V>, ReadOnlySessionTimestampStore<K, V> {
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> findSessionsWithTimestamp(K key, long earliestSessionEndTime, long latestSessionStartTime);
    KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> findSessionsWithTimestam(K keyFrom, K keyTo, long earliestSessionEndTime, long latestSessionStartTime);
    void putWithTimestamp(Windowed<K> sessionKey, V aggregate, long timestamp);
    void putWithTimestamp(Windowed<K> sessionKey, ValueAndTimestamp<V> valueWithTimestamp);
}

We extend our existing internal stores to implement the corresponding new interfaces.

Upgrading

For a clean upgrade path for RocksDB itself, we need to introduce the above configs and also change the user Subscription and Assigment rebalance metadata. For this change we bump the version number from 2 to 3 to distinguesh between old and new instances within a consumer group. New instances will encode a version number for there local stores. Becuase of this rebalance metadata change, we suggest to also fix KAFKA-6054 as mentioned above. Note that version 0.10.0.x used metadata version 1, while versions 0.10.1.x, ..., 1.1.x use metadata version 2.

Compatibility, Deprecation, and Migration Plan

The storage format change requires application to upgrade correctly. We plan to offer two upgrade pathes:

  • In-place upgrade: this requires two rolling bounces of each application instance
    • advantage: simpler than roll-over upgrade
    • disadvantage: need 2x local disk storage during upgrade
    • upgrade flow:
      1. prepare a jar hot swap from old version (not 0.10.0.x) to 1.2; Kafka Streams need to be configured with upgrade.mode="in_place" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. config "in_place" tells the application to send Subscription in version 2 (to be compatible with potential version 2 leader in the group)
        2. instances will receive a version 2 Assigmentin in this stage
      3. each instance will create a "back groud" task, that start to rebuild RocksDB stores in new format in parallel to existing RocksDB
        1. existing RocksDB will be used for processing so there is no down time
        2. each instance builds up the new RocksDB state for all its assigned active and standby tasks
        3. when all stores are ready, we print a INFO message in the log (thus logs must be watched)
      4. after all stores are prepared, user prepares a second round of rebalance; this time, the configuration parameter upgrade.mode must be removed for new startup
      5. do a second rolling bounce for each instance to get new config
        1. bounced instance can send a version 3 Subscription; they encode each store twice (once with storage format 1 and once with storage format 2)
        2. the leader sends version 2 Assigment (based on reported old stores with format 1) back to indicate that upgrade is not completed yet as long as at least one version 2 Subsription is received
        3. as long as version 2 Assigment is received, the old store will be used for processing and the new store will be futher updated in the back ground
        4. if the leader receives only version 3 Subscirption, it send version 3 Assigment (based on reported new stores with format 2) back indicating that the upgrad is completed
        5. when Kafka Streams receives a version 3 Subscription is check for local RocksDB directories in old format 1 and deletes them to free disk space
    • upgrading from 0.10.0.x to 1.2 uses the same upgrage pattern, however config upgrade.mode="in_place_0.10.0.x" must be used
      • instead of old Subscription and Assigmentmet metadata verions 2, metadata version 1 is used
  • Roll-over upgrade:
    • in-place upgrade might not be feasible because of its large local disk storage requirement; thus, roll-over is an alternative to enable upgrading if not enough local disk space is availalbe for a in-place upgrad
    • if an application is running with N instances, user starts N new instances in parallel (the all form one consumer group); the new intances rebuild the RocksDB state with the new format; finally, the old instances are stopped
    • upgrade flow:
      1. prepare a jar hot swap from old version (not 0.10.0.x) to 1.2 for all existing (old) instances; Kafka Streams need to be configured with upgrade.mode="roll_over" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. config "roll_over" tells the application to send Subscription in version 2 (tp be compatible with potential version 2 leader in the group)
        2. instances will receive a version 2 Assigmentin in this stage
        3. all old instances will just resume processing as usual
      3. users starts N new instances with config parameter upgrade.mode removed
        1. the new instances send Subscription with version 3
        2. the leader can distinguesh between old and new instances based on the used Subscription version and encoded information
          1. note, for the in-place upgrade case, new instances will report old and new store format for all their stores (as long as upgrade is not completed)
          2. new roll-over nodes, will only report new format stores (and never both)
          3. thus, the leader receives mixed version 2 and version 3 meta data, it can figure out if this is a in-place or rolling upgrade (if a 1.2 leader only receives version 2 metadata it cannot distingues between both upgrade modes, but there is also no need to disntinguesh both for this case as the leader behaves exacty the same for both upgrad protocols in this phase)
        3. the leader sends a version 2 Assignment to the old instances and "mirrors" the assignment for the new instances
          1. in the "mirror" assignment, aktive tasks are replaced by standby tasks and it's send as version 3 Assignment
          2. because the leader only assigns standby taks to the new instances, they only start to build up the stores in new storage format
          3. we need to report the progress of the StandbyTasks and write and INFO log message if they are ready to take over
      4. all old instances are stopped
        1. as long as at least one old instance is alive, the leader assign all aktive tasks to the old instances only
        2. however, as those are shut down in a row, idealy a rebalance will only trigger after all are down already (note, Kafka Streams does not send a "leave group request" and thus a rebalance only happens after session.timeout passes)
        3. when all old instances are down, the leader only receives version 3 Subscriptions and computes are version 3 Assigment base on the prepared stores and upgrade is finished
    • upgrading from 0.10.0.x to 1.2 uses the same upgrage pattern, however config upgrade.mode="roll_over_0.10.0.x" must be used
      • instead of old Subscription and Assigmentmet metadata verions 2, metadata version 1 is used
  • Offline upgrade:
    • this is the simplest form of upgrading but results in application downtime
    • stop all running instances
    • swap the jar and restart all instances (config upgrade.mode is not set)
      • the new instances will send and receive version 3 Subscription and Assignments
      • the will not find any store in new format and just recorver the stores using the new format from the changelog topic
      • as the receive version 3 Assignemnt they delete local RocksDB directories with old format
  • Upgrading from 0.10.0.x to 0.10.1.x, ..., 1.1.x
    • upgrade flow:
      1. prepare a jar hot swap from 0.10.0.x to 1.2; Kafka Streams need to be configured with upgrade.mode="0.10.0.x" for startup
      2. do a rolling bounce to get the new jar and config in place for each instance
        1. config "0.10.0.x" tells the application to send Subscription in version 1 (to be compatible with potential version 1 leader in the group)
        2. instances will receive a version 1 Assigmentin in this stage
      3. do a second rolling bounce for each instance with config parameter upgrade.mode removed
        1. bounced instance can send a version 2 Subscription
        2. the leader sends version 1 Assigment back as long as at least one version 1 Subsription is received
        3. if the leader receives only version 2 Subscirption, it send version 2 Assigment back

 

Test Plan

  • unit and integration tests for the new embedded timestamp feature
    • tests that insure the timestamps are written and read correctly and that records are (de)serialized correctly
    • this includes reading source KTables, aggregation operations that result in KTables, and all other operators that might force a KTable materialization
    • KTable recovery/restore must be tested.
  • unit and integration tests for StreamPartitionsAssigner to react correctly to configs and received subscription
  • system tests that perform rolling bounce upgrades as described above
    • this should include failure scenario during the upgrade

Rejected Alternatives

  • Change the RocksDB on-disk format and encode the used serialization version per record (this would simplify future upgrades). However there are main disadvantages:
    • storage amplification for local stores
    • record version could get stored in record headers in changelog topics -> changelog topic might never overwrite record with older format
    • code needs to check all versions all the time for future release: increases code complexity and runtime overhead
    • it's hard to change the key format
      • for value format, the version number can be a magic prefix byte
      • for key lookup, we would need to know the magic byte in advance for efficient point queries into RocksDB; if multiple versions exist in parallel, this is difficult (either do multiple queries with different versions bytes until entry is found or all versions are tried implying does not exist – or use range queries but those are very expensive)
  • use a simpler upgrade path without any configs or complex rolling bounce scenarios
    • requires application down-time for upgrading to new format
  • only support in-place upgrade path instead of two to simplify the process for users (don't need to pick)
    • might be prohibitive if not enough disk space is available
  • allow users to stay with old format: upgrade would be simpler as it's only one rolling bounce
    • unclear default behavior: should we stay on 1.1 format by default or should we use 1.2 format by default?
      • if 1.1 is default, upgrade is simple, but if one write a new application, users must turn on 1.2 format explicitly
      • if 1.2 is default, simple upgrade requires a config that tells Streams to stay with 1.1 format
      • conclusion: upgrading and not upgrading is not straight forward either way, thus, just force upgrade
    • if no upgrade happens, new features (as listed above) would be useless
  • Only prepare stores for active task (but not standby tasks)
    • this would reduce the disk footprint during upgrade
    • disadvantage: when switch to new version happens, there are not hot standby available for some time
    • we could make it configurable, however, we try to keep the number of configs small; also, it's already complex enough and adding more options would make it worse
    • it's not an issue for roll-over upgrade and not everybody configures Standbys in the frist place
      • people with standbys are willing to provide more disk, so it seem a fair assumption that they are fine with roll-over upgrade, too
  • Don't fix KAFKA-6054
    • it's a simple fix to include: just add two more accepted values to parameter upgrade.mode
    • it's s fair question, how many people will run with Streams 0.10.0 – note those, that if people are "stuck" with 0.10.0 broker, they can use 0.10.1 or newer as it's not backwards compatible to 0.10.0 – thus, might be more than expected
  • Fix KAFKA-6054 only for 1.2 release
    • it's a relativley simply fix for older releases (main desing work is already coverd and writing the code is not to complex becuase it's only the rebalance metadata version change)
    • upgrade path is also way simpler
    • it's unclear though if we will have bug-fix releases for older versions; thus nobody might ever be able to get this code (if they don't build from corresponding dev-branches themselves)

 

  • No labels