Child pages
  • KIP-617: Allow Kafka Streams State Stores to be iterated backwards

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

Introduce a enum type ReadDirection.BACKWARD|FORWARD to ReadOnlyKeyValueStore#range|all and ReadOnlyWindowStore#fetch|fetchAll|all:

public enum ReadDirection {
    FORWARD, BACKWARD
}

...

new StreamsConfig configuration to flag support for backwards iteration:



Code Block
languagejava
public class StreamsConfig extends AbstractConfig {
    public static final 

...

String ENABLE_BACKWARD_ITERATION_CONFIG = "enable.backward.iteration";
    

...

private static final 

...

String ENABLE_BACKWARD_ITERATION_DOC = "If true any range operations will accept (from, to) arguments to be from > to, returning recent records first";
}

If true, ReadOnlyKeyValueStore will support (from, to) argument pairs to be: from > to, returning iteration in reverse order.

To complete support for backwards iteration, `all` operations will be companioned by a `reverseAll`:



Code Block
languagejava
public interface ReadOnlyKeyValueStore<K, V> { KeyValueIterator<K, V> reverseAll(); } 
public interface ReadOnlyWindowStore<K, V> { KeyValueIterator<Windowed<K>, V> reverseAll(); } 



StreamConfig flag will be passed to Stores via `ProcessorContext`.

Internally, both implementations: persistent (RocksDB), and in-memory (TreeMap) support reverse/descending iteration:

Code Block
languagejava

...

final RocksIterator iter = db.newIterator(); 

...

iter.seekToFirst(); 

...

iter.next(); 
//

...

final RocksIterator reverse = db.newIterator(); 

...

reverse.seekToLast(); 

...

reverse.prev();

...

//
final 

...

TreeMap<String, 

...

String> map = new 

...

TreeMap<>(); 
final 

...

NavigableSet<String> nav = map.navigableKeySet(); 
final 

...

NavigableSet<String> rev = map.descendingKeySet();

...

Compatibility, Deprecation, and Migration Plan

Default methods would be in-place avoid affecting previous versions.

...

StreamsConfig will mitigate affecting users that are relying on current behaviour: if from>to, then return empty iterator. Only when users are enabling flag from>to will return a reversed iterator.

Therefore this change is backwards compatible.

Rejected Alternatives

  • Initially it was considered to have additional parameter on all readOnlyStore methods e.g. Store#fetch(keyFrom, keyTo, timeFrom, timeTo, ReadDirection.FORWARD|BACKWARD), but has been declines as passing arguments in inverse is more intuitive. As this could cause unexpected effects in future versions, a flag has been added to overcome this.