This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread:
JIRA: KAFKA-15348
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969
For this KIP, the following query types are considered to be implemented.
Range Queries
- key-range latest-value query
- key-range with lower bound latest-value query
- key-range with upper bound latest-value query
- all-keys (no bound) latest-value query
- key-range query with timestamp (upper) bound
- key-range with lower bound with timestamp (upper) bound
- key-range with upper bound with timestamp (upper) bound
- all-keys (no bound) with timestamp (upper) bound
- key-range query with timestamp range
- key-range query with lower bound with timestamp range
- key-range query with upper bound with timestamp range
- all-keys (no bound) with timestamp range
- key-range query all-versions
- key-range query with lower bound all-versions
- key-range query with upper bond all-versions
- all-keys query (no bound) all-versions (entire store)
Public Interfaces
In this KIP we propose the public classes, MultiVersionedRangeQuery that will be described in the next section.
Proposed Changes
For supporting range queries, MultiVersionedRangeQuery class is used.
- The methods are composable. Therefore, the meaningless combinations such as withRange(k1, k2).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be interpreted as a normal range query that returns the records with the latest timestamp.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range.
- The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
- The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
- The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class MultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTimestamp; private final Optional<Instant> asOfTimestamp; private final boolean isKeyAscending; private final boolean isTimeAscending; private final boolean isOrderedByKey; private MultiVersionedRangeQuery( final Optional<K> lower, final Optional<K> upper, final Optional<Instant> fromTimestamp, final Optional<Instant> asOfTimestamp, final boolean isOrderedByKey, final boolean isKeyAscending, final boolean isTimeAscending) { this.lower = lower; this.upper = upper; this.fromTimestamp = fromTimestamp; this.asOfTimestamp = asOfTimestamp; this.isOrderedByKey = isOrderedByKey; this.isKeyAscending = isKeyAscending; this.isTimeAscending = isTimeAscending; } /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withLowerKeyBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param upper The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withUpperKeyBound(final K upper); /** * Interactive scan query that returns all records in the store. * For each key the records valid * within the specified time range are returned. * In case the time range is not specified just * the latest record for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> allKeys(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTimestamp}. * @param fromTimestamp The starting time point */ public MultiVersionedRangeQuery<K, V> from(Instant fromTimestamp); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code asOfTimestamp}. * @param asOfTimestamp The ending time point */ public MultiVersionedRangeQuery<K, V> asOf(Instant asOfTimestamp); /** * Specifies the overall order of returned records by timestamp */ public MultiVersionedRangeQuery<K, V> orderByTimestamp(); /** * Specifies the order of keys as descending. */ public MultiVersionedRangeQuery<K, V> withDescendingKeys(); /** * Specifies the order of the timestamps as descending. */ public VersionedRangeQuery<K, V> withDescendingTimestamps(); /** * The lower bound of the query, if specified. */ public Optional<K> lowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> upperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> fromTimestamp(); /** * The ending time point of the query, if specified */ public Optional<Instant> asOfTimestamp(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isRangeAscending(); }
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
final MultiVersionedRangeQuery<Integer, Integer> query = MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } }
Compatibility, Deprecation, and Migration Plan
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.
Test Plan
The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries).