...
Motivation
The main goal is supporting to support interactive queries in the presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969968
For this KIP, the following query types are considered to be implemented. :
Range Queries
- key-range latest-value query
- key-range with lower bound latest-value query
- key-range with upper bound latest-value query
- all-keys (no bound) latest-value query
- key-range query with timestamp (upper) bound
- key-range with lower bound with timestamp (upper) bound
- key-range with upper bound with timestamp (upper) bound
- all-keys (no bound) with timestamp (upper) bound
- key-range query with timestamp range
- key-range query with lower bound with timestamp range
- key-range query with upper bound with timestamp range
- all-keys (no bound) with timestamp range
- key-range query all-versions
- key-range query with lower bound all-versions
- key-range query with upper bond all-versions
- all-keys query (no bound) all-versions (entire store)
...
- The methods are composable. The fromfromTime and asOftoTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds.
If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestampfromTime specifies the starting point. There can be records which have been inserted before the fromTimestampfromTime and are valid in the time range.
- The asOfTimestamptoTime specifies the ending point. Records that have been inserted at asOfTimestamptoTime are returned by the query as well.
- The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
- The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
- No ordering is guaranteed for the return records.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Code Block | ||||||
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class MultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTimestampfromTime; private final Optional<Instant> asOfTimestamptoTime; private MultiVersionedRangeQuery( final boolean isKeyAscending; private final booleanOptional<K> isTimeAscending; lower, private final booleanOptional<K> isOrderedByKey;upper, private MultiVersionedRangeQuery( final Optional<K>Optional<Instant> lowerfromTime, final Optional<Instant> Optional<K>toTime) upper,{ this.lower final Optional<Instant> fromTimestamp,= lower; this.upper final Optional<Instant> asOfTimestamp,= upper; this.fromTime = finalfromTime; boolean isOrderedByKey, this.toTime = toTime; final} boolean isKeyAscending, /** * finalInteractive booleanrange isTimeAscending)query { using a lower and this.lowerupper =bound lower; to filter the keys thisreturned.upper =* upper; For each this.fromTimestamp =* fromTimestamp; key the records valid this.asOfTimestampwithin =the asOfTimestamp; specified time range are thisreturned.isOrderedByKey =* isOrderedByKey; In case the time this.isKeyAscendingrange =is isKeyAscending; * not specified just this.isTimeAscendingthe =latest isTimeAscending; record for } each key is /**returned. * Interactive@param rangelower queryThe usingkey athat lowerspecifies andthe upperlower bound toof the filterrange the keys returned. * For@param eachupper The key that * keyspecifies the recordsupper validbound withinof the specifiedrange time range are returned. * In@param case<K> theThe timekey range istype * not@param specified<V> justThe thevalue latesttype record for each key is returned. */ *public @paramstatic lower<K, TheV> keyMultiVersionedRangeQuery<K, thatV> specifieswithKeyRange(final theK lower, boundfinal of the range K upper); /** * @param upper* TheInteractive keyrange thatquery specifiesusing thea upperlower bound to offilter the rangekeys returned. * @paramFor <K> Theeach key type the * records @paramvalid <V>within Thethe valuespecified typetime range are returned. */ In case public static <K, V> MultiVersionedRangeQuery<K, V> withKeyRange(final K lower, final K upper); /**the time range is not * specified just the latest record for each key is returned. * Interactive range query using a@param lower The key that specifies the lower bound toof filter the keys returned. * For each key therange * records@param valid<K> within the specifiedThe timekey rangetype are returned. * In@param case<V> the time rangeThe isvalue nottype */ specified justpublic thestatic latest<K, recordV> forMultiVersionedRangeQuery<K, eachV> keywithLowerKeyBound(final is returned.K lower); /** * @param lower* TheInteractive keyrange thatquery specifiesusing thea lower bound to offilter the range keys returned. * @paramFor <K> each The key typethe * @paramrecords <V>valid within the Thespecified valuetime type range are returned. */ In publiccase staticthe <K,time V>range MultiVersionedRangeQuery<K,is V> withLowerKeyBound(final K lower); not /** specified just *the Interactivelatest rangerecord queryfor usingeach akey loweris boundreturned. to filter the* keys@param returned.upper *The Forkey eachthat keyspecifies the lower bound * records valid within the specified time range are returned.of the range * In@param case<K> the time rangeThe iskey nottype * specified@param just<V> the latest recordThe forvalue eachtype key is returned.*/ public *static @param<K, upperV> TheMultiVersionedRangeQuery<K, keyV> thatwithUpperKeyBound(final specifies the lower bound of the rangeK upper); /** * Interactive @paramscan <K>query that returns Theall keyrecords type in the store. * @paramFor <V>each key the Therecords value typevalid */ within the publicspecified statictime <K,range V>are MultiVersionedRangeQuery<K,returned. V>* withUpperKeyBound(finalIn K upper); /** * Interactive scan query that returns all records in the store. * For each key the records validcase the time range is not specified just * the latest record for each key is returned. * within@param the<K> specifiedThe timekey rangetype are returned. * In@param case<V> theThe time range is not specified justvalue type */ public *static the<K, latestV> recordMultiVersionedRangeQuery<K, for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> allKeysV> allKeys(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTimestampfromTime}. * @param fromTimestampfromTime The starting time point */ public MultiVersionedRangeQuery<K, V> fromfromTime(Instant fromTimestampfromTime); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code asOfTimestamptoTime}. * @param asOfTimestamptoTime The ending time point */ public MultiVersionedRangeQuery<K, V> asOftoTime(Instant asOfTimestamptoTime); /** * SpecifiesThe thelower overallbound order of returnedthe recordsquery, byif timestampspecified. */ public MultiVersionedRangeQuery<K, V> orderByTimestampOptional<K> lowerKeyBound(); /** * SpecifiesThe theupper orderbound of the keysquery, asif descending.specified */ public MultiVersionedRangeQuery<K, V> withDescendingKeysOptional<K> upperKeyBound(); /** * The Specifiesstarting thetime orderpoint of the timestampsquery, asif descending.specified */ public VersionedRangeQuery<K, V> withDescendingTimestampsOptional<Instant> fromTime(); /** * The ending lowertime boundpoint of the query, if specified. */ public Optional<K>Optional<Instant> lowerKeyBoundtoTime(); } /** * The upper bound of the query, if specified */ public Optional<K> upperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> fromTimestamp(); /** * The ending time point of the query, if specified */ public Optional<Instant> asOfTimestamp(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isRangeAscending(); } |
Another get method is added to the VersionedKeyValueStore interface.
|
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(2, 20, time=2023-01-10T10:00:00.00Z)
put(3, 30, time=2023-01-12T10:00:00.00Z)
put(1, 2, time=2023-01-15T10:00:00.00Z)
put(1, 3, time=2023-01-20T10:00:00.00Z)
put(2, 30, time=2023-01-25T10:00:00.00Z)
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
inStore("my_store").withQuery(query1);
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
| ||||||||
Code Block | ||||||||
| ||||||||
package org.apache.kafka.streams.state; public interface VersionedKeyValueStore<K, V> extends StateStore { /** * Get the record associated with this key as of the specified timestamp (i.e., * the existing record with the largest timestamp not exceeding the provided * timestamp bound). * * @param lowerKeyBound The key that specifies the lower key bound of the range * @param upperKeyBound The key that specifies the upper key bound of the range * @param fromTimestamp The timestamp lower bound. The records that have been inserted at or before this timestamp and did not become tombstone at or before while (iterator.hasNext()) { this timestamp will befinal retrievedKeyValue<Integer, andVersionedRecord<Integer>> returned. record = iterator.next(); * @param asOfTimestamp The timestamp bound. This bound isInteger inclusive; if akey = record.key; * Integer value = record.value.value(); Long timestamp = (for the specified key) exists with this timestamp, then * this is the record that will be returned.record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the @returnprinted Theoutput valuewill andbe timestamp (along with the validTokey,value: 1,1, timestamp) of the records with keys within the specified key range: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z * as of the provided timestamp, or {@code null} if no such record exists * (including if the provided timestamp bound is older than this store's history * retention time, i.e., the store no longer contains data for the provided * timestamp). * @throws NullPointerException If null is used for lowerKeyBound or upperKeyBound. * @throws InvalidStateStoreException if the store is not initialized */ VersionedRecord<V> get(K lowerKeyBound, K upperKeyBound, long fromTimestamp, long asOfTimestamp); } |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||
---|---|---|---|---|
| ||||
final MultiVersionedRangeQuery<Integer, Integer> query = key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now */ // example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2); query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 = inStore("my_store").withQuery(query2); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = Integer key = record.key; Integer value inStore("my_store").withQuery(query= record.value.value(); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the resultsLong fromtimestamp all= partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } }record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key, value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key, value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z */ |
Compatibility, Deprecation, and Migration Plan
...
The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed.
Rejected Alternatives
The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.