Status

Current state: "Under Discussion"

Discussion thread: here

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Fetching range of records from Kafka Streams state stores comes with an iterator to traverse elements from oldest to newest, e.g ReadOnlyWindowStore#fetch(K key, long fromTime, long toTime) mentions:

For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest”

Similar guarantees are provided on other fetch and range operations.

This APIs constraint the usage of local state store for some use-cases:

When storing records on time windows, or records by key; and an operation wants to return the last N values inserted withing a time range containing M records: Currently there is no alternative other than iterating records from oldest to newest–traversing M records, where M » N.

If a backward read direction option becomes available, then we could start from the latest record within a time range and go backwards, returning the first N value more efficiently.

At Zipkin Kafka-based storage, we are planning to use this feature to replace two KeyValueStores (one for traces indexed by id, and another with trace_ids indexed by timestamp) for one WindowStore. A backward read direction will allow to support queries like: “within this time range, find the last traces that match this criteria”, and return latest values quickly.

Reference issues

Proposed Changes

Introduce a new StreamsConfig configuration to flag support for backwards iteration:



public class StreamsConfig extends AbstractConfig {
    public static final String ENABLE_BACKWARD_ITERATION_CONFIG = "enable.backward.iteration";
    private static final String ENABLE_BACKWARD_ITERATION_DOC = "If true any range operations will accept (from, to) arguments to be from > to, returning recent records first";
}

If true, ReadOnlyKeyValueStore will support (from, to) argument pairs to be: from > to, returning iteration in reverse order.

To complete support for backwards iteration, `all` operations will be companioned by a `reverseAll`:



public interface ReadOnlyKeyValueStore<K, V> { KeyValueIterator<K, V> reverseAll(); } 
public interface ReadOnlyWindowStore<K, V> { KeyValueIterator<Windowed<K>, V> reverseAll(); } 



StreamConfig flag will be passed to Stores via `ProcessorContext`.

Internally, both implementations: persistent (RocksDB), and in-memory (TreeMap) support reverse/descending iteration:

final RocksIterator iter = db.newIterator(); 
iter.seekToFirst(); 
iter.next(); 
//
final RocksIterator reverse = db.newIterator(); 
reverse.seekToLast(); 
reverse.prev();
//
final TreeMap<String, String> map = new TreeMap<>(); 
final NavigableSet<String> nav = map.navigableKeySet(); 
final NavigableSet<String> rev = map.descendingKeySet();

Compatibility, Deprecation, and Migration Plan

StreamsConfig will mitigate affecting users that are relying on current behaviour: if from>to, then return empty iterator. Only when users are enabling flag from>to will return a reversed iterator.

Therefore this change is backwards compatible.

Rejected Alternatives