...
- The methods are composable. The fromTime and toTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds.
If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be translated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTime specifies the starting point. There can be records which have been inserted before the fromTime and are valid in the time range.
- The toTime specifies the ending point. Records that have been inserted at toTime are returned by the query as well.
- The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectivelyNo ordering is guaranteed for the return records.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class MultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTime; private final Optional<Instant> toTime; private MultiVersionedRangeQuery( final boolean isKeyAscending; private final boolean isTimeAscending; private final boolean isOrderedByKey; private MultiVersionedRangeQuery( final Optional<K> Optional<K> lower, final Optional<K> upper, final Optional<Instant> fromTime, final Optional<Instant> toTime,) { this.lower = finallower; boolean isOrderedByKey, this.upper = upper; final boolean isKeyAscending, this.fromTime = fromTime; finalthis.toTime boolean isTimeAscending) {= toTime; } this.lower = lower; /** this.upper = upper; this.fromTime = fromTime; this.toTime = toTime;* Interactive range query using a lower and upper bound to filter the keys returned. * For each * this.isOrderedByKeykey =the isOrderedByKey; records valid within the this.isKeyAscendingspecified =time isKeyAscending; range are thisreturned.isTimeAscending =* isTimeAscending; In case } the time range /**is * Interactivenot rangespecified queryjust usingthe alatest lowerrecord andfor uppereach boundkey tois filterreturned. the keys returned. * For@param eachlower The key that * keyspecifies the recordslower validbound withinof the specifiedrange time range are returned.* *@param Inupper caseThe thekey timethat rangespecifies isthe upper bound of *the notrange specified just the latest* record@param for<K> eachThe key istype returned. * @param lower<V> The keyvalue thattype specifies the lower bound*/ of the range public static <K, *V> @paramMultiVersionedRangeQuery<K, upperV> ThewithKeyRange(final keyK thatlower, specifiesfinal theK upper); bound of the range /** * @paramInteractive <K>range Thequery keyusing typea lower bound to *filter @paramthe <V>keys Thereturned. value* typeFor each key */ the public* staticrecords <K,valid V>within MultiVersionedRangeQuery<K,the V>specified withKeyRange(finaltime Krange lower, final K upper); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withLowerKeyBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param upper The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withUpperKeyBound(final K upper); /** * Interactive scan query that returns all records in the store. * For each key the records valid * within the specified time range are returned. * In case the time range is not specified just * the latest record for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> allKeys(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTime}. * @param fromTime The starting time point */ public MultiVersionedRangeQuery<K, V> fromTime(Instant fromTime); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code toTime}. * @param toTime The ending time point */ public MultiVersionedRangeQuery<K, V> toTime(Instant toTime); /** * Specifies the overall order of returned records by key */ public MultiVersionedRangeQuery<K, V> orderByKey(); /** * Specifies the overall order of returned records by timestamp */ public MultiVersionedRangeQuery<K, V> orderByTimestamp(); /** * Specifies the order of keys as ascending. */ public MultiVersionedRangeQuery<K, V> withAscendingKeys(); /** * Specifies the order of keys as descending. */ public MultiVersionedRangeQuery<K, V> withDescendingKeys(); /** * Specifies the order of the timestamps as ascending. */ public MultiVersionedRangeQuery<K, V> withAscendingTimestamps(); /** * Specifies the order of the timestamps as descending. */ public MultiVersionedRangeQuery<K, V> withDescendingTimestamps(); /** * The lower bound of the query, if specified. */ public Optional<K> lowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> upperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> fromTime(); /** * The ending time point of the query, if specified */ public Optional<Instant> toTime(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isTimeAscending(); } |
Another get method is added to the VersionedKeyValueStore interface.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.state; public interface VersionedKeyValueStore<K, V> extends StateStore { /** * Get the record associated with this key as of the specified timestamp (i.e., * the existing record with the largest timestamp not exceeding the provided * timestamp bound). * * @param lowerKeyBound The key that specifies the lower key bound of the range * @param upperKeyBound The key that specifies the upper key bound of the range * @param fromTime The timestamp lower bound. The records that have been inserted at or before this timestamp and did not become tombstone at or before this timestamp will be retrieved and returned. * @param toTime The timestamp bound. This bound is inclusive; if a record * (for the specified key) exists with this timestamp, then * this is the record that will be returned. * @return The value and timestamp (along with the validTo timestamp) of the records with keys within the specified key range * as of the provided timestamp, or {@code null} if no such record exists * (including if the provided timestamp bound is older than this store's history * retention time, i.e., the store no longer contains data for the provided * timestamp). * @throws NullPointerException If null is used for lowerKeyBound or upperKeyBound. * @throws InvalidStateStoreException if the store is not initialized */ KeyValueIterator<K, VersionedRecord<V>> get(K lowerKeyBound, K upperKeyBound, long fromTime, long toTime); }are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withLowerKeyBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param upper The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> withUpperKeyBound(final K upper); /** * Interactive scan query that returns all records in the store. * For each key the records valid * within the specified time range are returned. * In case the time range is not specified just * the latest record for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> MultiVersionedRangeQuery<K, V> allKeys(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTime}. * @param fromTime The starting time point */ public MultiVersionedRangeQuery<K, V> fromTime(Instant fromTime); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code toTime}. * @param toTime The ending time point */ public MultiVersionedRangeQuery<K, V> toTime(Instant toTime); /** * The lower bound of the query, if specified. */ public Optional<K> lowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> upperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> fromTime(); /** * The ending time point of the query, if specified */ public Optional<Instant> toTime(); } |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
...
Code Block | ||||
---|---|---|---|---|
| ||||
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions final MultiVersionedRangeQuery<Integer, Integer> query1 = MultiVersionedRangeQuery.withKeyRange(1, 2); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 = inStore("my_store").withQuery(query1); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key,value: 1,1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now */ // example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2); query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z)).orderByTimestamp().withDescendingTimestamps(); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 = inStore("my_store").withQuery(query2); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key, value: 1,2, timestamp: 2023-01-20T1015T10:00:00.00Z, valid till: now value: 1,22023-01-20T10:00:00.00Z key, value: 2,20, timestamp: 2023-01-15T1010T10:00:00.00Z, valid till: 2023-01-20T1025T10:00:00.00Z */ |
Compatibility, Deprecation, and Migration Plan
...
The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed.
Rejected Alternatives
The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.