...
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-15348
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The main goal is supporting to support interactive queries in the presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969968
For this KIP, the following query types are considered to be implemented. :
Range Queries
- key-range latest-value query
- key-range with lower bound latest-value query
- key-range with upper bound latest-value query
- all-keys (no bound) latest-value query
- key-range query with timestamp (upper) bound
- key-range with lower bound with timestamp (upper) bound
- key-range with upper bound with timestamp (upper) bound
- all-keys (no bound) with timestamp (upper) bound
- key-range query with timestamp range
- key-range query with lower bound with timestamp range
- key-range query with upper bound with timestamp range
- all-keys (no bound) with timestamp range
- key-range query all-versions
- key-range query with lower bound all-versions
- key-range query with upper bond all-versions
- all-keys query (no bound) all-versions (entire store)
...
In this KIP we propose the public classes, MultiVersionedRangeQuery that will be described in the next section. More over a method will be added to the VersionedKeyValueStore interface.
Proposed Changes
...
- The methods are composable. The fromfromTime and asOftoTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds.
If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestampfromTime specifies the starting point. There can be records which have been inserted before the fromTimestampfromTime and are valid in the time range.
- The asOfTimestamptoTime specifies the ending point. Records that have been inserted at asOfTimestamptoTime are returned by the query as well.
- The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
- The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
- No ordering is guaranteed for the return records.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Code Block | ||||||
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class MultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTimestampfromTime; private final Optional<Instant> asOfTimestamptoTime; private MultiVersionedRangeQuery( final boolean isKeyAscending; private final booleanOptional<K> isTimeAscending; lower, private final booleanOptional<K> isOrderedByKey;upper, private MultiVersionedRangeQuery( final Optional<K>Optional<Instant> lowerfromTime, final Optional<Instant> Optional<K>toTime) upper,{ this.lower final Optional<Instant> fromTimestamp,= lower; this.upper final Optional<Instant> asOfTimestamp,= upper; this.fromTime = finalfromTime; boolean isOrderedByKey, this.toTime = toTime; final} boolean isKeyAscending, /** * finalInteractive booleanrange isTimeAscending)query { using a lower and this.lowerupper =bound lower; to filter the keys thisreturned.upper =* upper; For each this.fromTimestamp =* fromTimestamp; key the records valid this.asOfTimestampwithin =the asOfTimestamp; specified time range are thisreturned.isOrderedByKey =* isOrderedByKey; In case the time this.isKeyAscendingrange =is isKeyAscending; * not specified just this.isTimeAscendingthe =latest isTimeAscending; record for } each key is /**returned. * Interactive@param rangelower queryThe usingkey athat lowerspecifies andthe upperlower bound toof the filterrange the keys returned. * For@param eachupper The key that * keyspecifies the recordsupper validbound withinof the specifiedrange time range are returned. * In@param case<K> theThe timekey range istype * not@param specified<V> justThe thevalue latesttype record for each key is returned. */ *public @paramstatic lower<K, TheV> keyMultiVersionedRangeQuery<K, thatV> specifieswithKeyRange(final theK lower, boundfinal of the range K upper); /** * @param upper* TheInteractive keyrange thatquery specifiesusing thea upperlower bound to offilter the rangekeys returned. * @paramFor <K> Theeach key type the * records @paramvalid <V>within Thethe valuespecified typetime range are returned. */ In case public static <K, V> MultiVersionedRangeQuery<K, V> withKeyRange(final K lower, final K upper); /**the time range is not * specified just the latest record for each key is returned. * Interactive range query using a@param lower The key that specifies the lower bound toof filter the keys returned. * For each key therange * records@param valid<K> within the specifiedThe timekey rangetype are returned. * In@param case<V> the time rangeThe isvalue nottype */ specified justpublic thestatic latest<K, recordV> forMultiVersionedRangeQuery<K, eachV> keywithLowerKeyBound(final is returned.K lower); /** * @param lower* TheInteractive keyrange thatquery specifiesusing thea lower bound to offilter the range keys returned. * @paramFor <K> each The key typethe * @paramrecords <V>valid within the Thespecified valuetime type range are returned. */ In publiccase staticthe <K,time V>range MultiVersionedRangeQuery<K,is V> withLowerKeyBound(final K lower); not /** specified just *the Interactivelatest rangerecord queryfor usingeach akey loweris boundreturned. to filter the* keys@param returned.upper *The Forkey eachthat keyspecifies the lower bound * records valid within the specified time range are returned.of the range * In@param case<K> the time rangeThe iskey nottype * specified@param just<V> the latest recordThe forvalue eachtype key is returned.*/ public *static @param<K, upperV> TheMultiVersionedRangeQuery<K, keyV> thatwithUpperKeyBound(final specifies the lower bound of the rangeK upper); /** * Interactive @paramscan <K>query that returns Theall keyrecords type in the store. * @paramFor <V>each key the Therecords value typevalid */ within the publicspecified statictime <K,range V>are MultiVersionedRangeQuery<K,returned. V>* withUpperKeyBound(finalIn K upper); /** * Interactive scan query that returns all records in the store. * For each key the records validcase the time range is not specified just * the latest record for each key is returned. * within@param the<K> specifiedThe timekey rangetype are returned. * In@param case<V> theThe time range is not specified justvalue type */ public *static the<K, latestV> recordMultiVersionedRangeQuery<K, for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> allKeysV> allKeys(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTimestampfromTime}. * @param fromTimestampfromTime The starting time point */ public MultiVersionedRangeQuery<K, V> fromfromTime(Instant fromTimestampfromTime); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code asOfTimestamptoTime}. * @param asOfTimestamptoTime The ending time point */ public MultiVersionedRangeQuery<K, V> asOftoTime(Instant asOfTimestamptoTime); /** * SpecifiesThe thelower overallbound order of returnedthe recordsquery, byif timestampspecified. */ public MultiVersionedRangeQuery<K, V> orderByTimestampOptional<K> lowerKeyBound(); /** * SpecifiesThe theupper orderbound of the keysquery, asif descending.specified */ public MultiVersionedRangeQuery<K, V> withDescendingKeysOptional<K> upperKeyBound(); /** * The Specifiesstarting thetime orderpoint of the timestampsquery, asif descending.specified */ public VersionedRangeQuery<K, V> withDescendingTimestampsOptional<Instant> fromTime(); /** * The ending lowertime boundpoint of the query, if specified. */ public Optional<K>Optional<Instant> lowerKeyBoundtoTime(); } /** * The upper bound of the query, if specified */ public Optional<K> upperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> fromTimestamp(); /** * The ending time point of the query, if specified */ public Optional<Instant> asOfTimestamp(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isRangeAscending(); } |
Another get method is added to the VersionedKeyValueStore interface.
|
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(2, 20, time=2023-01-10T10:00:00.00Z)
put(3, 30, time=2023-01-12T10:00:00.00Z)
put(1, 2, time=2023-01-15T10:00:00.00Z)
put(1, 3, time=2023-01-20T10:00:00.00Z)
put(2, 30, time=2023-01-25T10:00:00.00Z)
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions
final MultiVersionedRangeQuery<Integer, Integer> query1 =
MultiVersionedRangeQuery.withKeyRange(1, 2);
final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 =
inStore("my_store").withQuery(query1);
final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1);
// Get the results from all partitions.
final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults();
for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) {
try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) {
| ||||||||
Code Block | ||||||||
| ||||||||
package org.apache.kafka.streams.state; public interface VersionedKeyValueStore<K, V> extends StateStore { /** * Get the record associated with this key as of the specified timestamp (i.e., * the existing record with the largest timestamp not exceeding the provided * timestamp bound). * * @param lowerKeyBound The key that specifies the lower key bound of the range * @param upperKeyBound The key that specifies the upper key bound of the range * @param fromTimestamp The timestamp lower bound. The records that have been inserted at or before this timestamp and did not become tombstone at or before while (iterator.hasNext()) { this timestamp will befinal retrievedKeyValue<Integer, andVersionedRecord<Integer>> returned. record = iterator.next(); * @param asOfTimestamp The timestamp bound. This bound isInteger inclusive; if akey = record.key; * Integer value = record.value.value(); Long timestamp = (for the specified key) exists with this timestamp, then * this is the record that will be returned.record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the @returnprinted Theoutput valuewill andbe timestamp (along with the validTokey,value: 1,1, timestamp) of the records with keys within the specified key range: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z * as of the provided timestamp, or {@code null} if no such record exists * (including if the provided timestamp bound is older than this store's history * retention time, i.e., the store no longer contains data for the provided * timestamp). * @throws NullPointerException If null is used for lowerKeyBound or upperKeyBound. * @throws InvalidStateStoreException if the store is not initialized */ VersionedRecord<V> get(K lowerKeyBound, K upperKeyBound, long fromTimestamp, long asOfTimestamp); } |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||
---|---|---|---|---|
| ||||
final MultiVersionedRangeQuery<Integer, Integer> query = key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now */ // example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2); query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 = inStore("my_store").withQuery(query2); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); MultiVersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = Integer key = record.key; Integer value inStore("my_store").withQuery(query= record.value.value(); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the resultsLong fromtimestamp all= partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } }record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key, value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key, value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z */ |
Compatibility, Deprecation, and Migration Plan
...
The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed.
Rejected Alternatives
The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.