...
Motivation
The main goal is supporting to support interactive queries in the presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969968
For this KIP, the following query types are considered to be implemented. :
Range Queries
- key-range latest-value query
- key-range with lower bound latest-value query
- key-range with upper bound latest-value query
- all-keys (no bound) latest-value query
- key-range query with timestamp (upper) bound
- key-range with lower bound with timestamp (upper) bound
- key-range with upper bound with timestamp (upper) bound
- all-keys (no bound) with timestamp (upper) bound
- key-range query with timestamp range
- key-range query with lower bound with timestamp range
- key-range query with upper bound with timestamp range
- all-keys (no bound) with timestamp range
- key-range query all-versions
- key-range query with lower bound all-versions
- key-range query with upper bond all-versions
- all-keys query (no bound) all-versions (entire store)
...
- The methods are composable. The fromfromTime and asOftoTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds.
If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTime specifies the starting point. There can be records which have been inserted before the fromTime and are valid in the time range.
- The toTime specifies the ending point. Records that have been inserted at toTime are returned by the query as well.
- The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
- The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
- No ordering is guaranteed for the return records.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Code Block | ||||||
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class MultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTime; private final Optional<Instant> toTime; private MultiVersionedRangeQuery( final boolean isKeyAscending; private final booleanOptional<K> isTimeAscending;lower, private final boolean isOrderedByKey; final privateOptional<K> MultiVersionedRangeQuery(upper, final Optional<K>Optional<Instant> lowerfromTime, final Optional<Instant> Optional<K>toTime) upper,{ this.lower = finallower; Optional<Instant> fromTime, this.upper = upper; final Optional<Instant> toTime, this.fromTime = fromTime; finalthis.toTime boolean= isOrderedByKey,toTime; } /** final boolean isKeyAscending, final boolean isTimeAscending) { this.lower = lower; this.upper = upper; this.fromTime = fromTime; this.toTime = toTime; this.isOrderedByKey = isOrderedByKey; this.isKeyAscending = isKeyAscending; this.isTimeAscending = isTimeAscending; } /** * Interactive range query using a lower and upper bound to filter the keys returned. * For each * key the records valid within the specified time range are returned. * In case the time range is * Interactive range query using a lower and upper bound to filter the keys returned. * For each * key the records valid within the specified time range are returned. * In case the time range is * not specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * not@param specified<V> justThe thevalue latesttype record for each key is returned. */ *public @paramstatic lower<K, TheV> keyMultiVersionedRangeQuery<K, thatV> specifieswithKeyRange(final theK lower, boundfinal of the range K upper); /** * @paramInteractive upperrange The key that specifies the upper bound of the range query using a lower bound to filter the keys returned. * For each key the * records @paramvalid <K>within Thethe keyspecified typetime range are returned. * In @paramcase <V>the Thetime valuerange typeis not */ specified just the publiclatest staticrecord <K,for V>each MultiVersionedRangeQuery<K,key V> withKeyRange(final K lower, final K upper); /** * Interactive range query using ais returned. * @param lower The key that specifies the lower bound toof filterthe therange keys returned. * For each@param <K> The key thetype * records@param valid<V> within the specifiedThe timevalue rangetype are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range/ public static <K, V> MultiVersionedRangeQuery<K, V> withLowerKeyBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * @paramrecords <K>valid within the Thespecified keytime type range are returned. * @paramIn <V>case the time Therange valueis typenot */ specified publicjust staticthe <K,latest V>record MultiVersionedRangeQuery<K,for V>each withLowerKeyBound(finalkey K lower); is returned. /** @param upper *The Interactivekey rangethat queryspecifies usingthe a lower bound toof filter the keys returned. * For each key therange * records@param valid<K> within the specifiedThe timekey rangetype are returned. * In@param case<V> the time rangeThe isvalue nottype */ specified justpublic thestatic latest<K, recordV> forMultiVersionedRangeQuery<K, eachV> key is returned.withUpperKeyBound(final K upper); /** * @paramInteractive upperscan The keyquery that specifiesreturns theall lowerrecords bound ofin the range store. * @paramFor <K>each key the Therecords key typevalid * @paramwithin <V>the specified time Therange value type are returned. */ In publiccase staticthe <K,time V>range MultiVersionedRangeQuery<K,is V>not withUpperKeyBound(final K upper); /**specified just * Interactivethe scanlatest queryrecord thatfor returnseach allkey recordsis inreturned. the store. * For@param each<K> The key the records validtype * within@param the<V> specifiedThe timevalue rangetype are returned. */ Inpublic casestatic the<K, timeV> rangeMultiVersionedRangeQuery<K, is not specified justV> allKeys(); /** * Specifies the lateststarting time recordpoint for eachthe key is returned.query. The range query returns all the records * @param <K> The key type that are valid in the time range starting from the timestamp {@code fromTime}. * @param <V>fromTime The starting valuetime typepoint */ public static <K, V> MultiVersionedRangeQuery<K, V> allKeysfromTime(Instant fromTime); /** * Specifies the startingending time point for the key query. The range query returns all the records that * thathave are valid in the time range starting from the timestamptimestamp <= {@code fromTimetoTime}. * @param fromTimetoTime The startingending time point */ public MultiVersionedRangeQuery<K, V> fromtoTime(Instant fromTimetoTime); /** * SpecifiesThe thelower ending time point forbound of the key query., The range query returns all the records that * have timestamp <= {@code toTime}. * @param toTime The ending time pointif specified. */ public Optional<K> lowerKeyBound(); /** * The upper bound of the query, if specified */ public MultiVersionedRangeQuery<K, V> asOf(Instant toTime); Optional<K> upperKeyBound(); /** * SpecifiesThe thestarting overalltime orderpoint of returnedthe recordsquery, byif timestampspecified */ public MultiVersionedRangeQuery<K, V> orderByTimestampOptional<Instant> fromTime(); /** * SpecifiesThe ending thetime orderpoint of the keysquery, asif descending.specified */ public MultiVersionedRangeQuery<K, V> withDescendingKeysOptional<Instant> toTime(); /** * Specifies the order of the timestamps as descending. */ public VersionedRangeQuery<K, V> withDescendingTimestamps(); /** * The lower bound of the query, if specified. */ public Optional<K> lowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> upperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> fromTime(); /** * The ending time point of the query, if specified */ public Optional<Instant> toTime(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isRangeAscending(); } |
Another get method is added to the VersionedKeyValueStore interface.
}
|
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(2, 20, time=2023-01-10T10:00:00.00Z)
put(3, 30, time=2023-01-12T10:00:00.00Z)
put(1, 2, time=2023-01-15T10:00:00.00Z)
put(1, 3, time=2023-01-20T10:00:00.00Z)
put(2, 30, time=2023-01-25T10:00:00.00Z)
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
inStore("my_store").withQuery(query1);
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
| ||||||||
Code Block | ||||||||
| ||||||||
package org.apache.kafka.streams.state; public interface VersionedKeyValueStore<K, V> extends StateStore { /** * Get the record associated with this key as of the specified timestamp (i.e., * the existing record with the largest timestamp not exceeding the provided * timestamp bound). * * @param lowerKeyBound The key that specifies the lower key bound of the range * @param upperKeyBound The key that specifies the upper key bound of the range * @param fromTime The timestamp lower bound. The records that have been inserted at or before this timestamp and did not become tombstone at or before while (iterator.hasNext()) { this timestamp will befinal retrievedKeyValue<Integer, andVersionedRecord<Integer>> returned. record = iterator.next(); * @param toTime The timestamp bound. This bound isInteger inclusive; if akey = record.key; * Integer value = record.value.value(); Long timestamp = (for the specified key) exists with this timestamp, then * this is the record that will be returned.record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the @returnprinted Theoutput valuewill andbe timestamp (along with the validTokey,value: 1,1, timestamp) of the records with keys within the specified key range: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z * as of the provided timestamp, or {@code null} if no such record exists * (including if the provided timestamp bound is older than this store's history * retention time, i.e., the store no longer contains data for the provided * timestamp). * @throws NullPointerException If null is used for lowerKeyBound or upperKeyBound. * @throws InvalidStateStoreException if the store is not initialized */ VersionedRecord<V> get(K lowerKeyBound, K upperKeyBound, long fromTime, long toTime); } |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||
---|---|---|---|---|
| ||||
final MultiVersionedRangeQuery<Integer, Integer> query = key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now */ // example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2); query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 = inStore("my_store").withQuery(query2); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = Integer key = record.key; Integer value inStore("my_store").withQuery(query= record.value.value(); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the resultsLong fromtimestamp all= partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } }record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key, value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key, value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z */ |
Compatibility, Deprecation, and Migration Plan
...
The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed.
Rejected Alternatives
The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.