Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
-
KAFKA-13492Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Provide an implementation of the Query
interface, introduced in KIP-796: Interactive Query v2 , to support range and scan queries
Proposed Changes
The RangeQuery
class will be used for both range and scan queries. A scan is performed when no lower and no upper bound is specified
@Evolving public class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { private final Optional<K> lower; private final Optional<K> upper; private RangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) { return new RangeQuery<>(Optional.of(lower), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper)); } public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty()); } public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty()); } public Optional<K> getLowerBound() { return lower; } public Optional<K> getUpperBound() { return upper; } } // ====================================== // Range query example usage in IQv2: Integer key1 = 1; Integer key2 = 2; // create the query parameters final StateSerdes<Integer, ValueAndTimestamp<Integer>> serdes = kafkaStreams.serdesForStore("mystore") StateQueryRequest<KeyValueIterator<Integer, Integer>> query = inStore("mystore") .withQuery(RangeQuery.withRange(key1, key2)); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> partitionResults = rangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, Integer>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, Integer> keyValueIterator = entry.getValue().getResult()) { while (keyValueIterator.hasNext()) { final KeyValue<Integer, Integer> next = keyValueIterator.next(); Integer key = next.key.get; Integer value = next.value; } } } // ====================================== // Scan query example usage in IQv2: // create the query parameters StateQueryRequest<KeyValueIterator<Integer, Integer>> query = inStore("mystore") .withQuery(RangeQuery.withNoBounds()); // run the query StateQueryResult<KeyValueIterator<Integer, Integer>> result = kafkaStreams.query(query);
There will also be an implementation for a "raw" version of the RangeQuery, which simply takes the key as a byte array and returns the value as a byte array.
public class RawRangeQuery implements Query<KeyValueIterator<Bytes, byte[]>> { private final Optional<Bytes> lower; private final Optional<Bytes> upper; private RawRangeQuery(final Optional<Bytes> lower, final Optional<Bytes> upper) { this.lower = lower; this.upper = upper; } public static RawRangeQuery withRange(final Bytes lower, final Bytes upper) { return new RawRangeQuery(Optional.of(lower), Optional.of(upper)); } public static RawRangeQuery withUpperBound(final Bytes upper) { return new RawRangeQuery(Optional.empty(), Optional.of(upper)); } public static RawRangeQuery withLowerBound(final Bytes lower) { return new RawRangeQuery(Optional.of(lower), Optional.empty()); } public static RawRangeQuery withNoBounds() { return new RawRangeQuery(Optional.empty(), Optional.empty()); } public Optional<Bytes> getLowerBound() { return lower; } public Optional<Bytes> getUpperBound() { return upper; }
Compatibility, Deprecation, and Migration Plan
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.