...
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-15348
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The main goal is supporting to support interactive queries in the presence of versioned state stores (KIP-889) in AK. This KIP is the successor of KIP-960 and KIP-969968
For this KIP, the following query types are considered to be implemented. :
Range Queries
- key-range latest-value query
- key-range with lower bound latest-value query
- key-range with upper bound latest-value query
- all-keys (no bound) latest-value query
- key-range query with timestamp (upper) bound
- key-range with lower bound with timestamp (upper) bound
- key-range with upper bound with timestamp (upper) bound
- all-keys (no bound) with timestamp (upper) bound
- key-range query with timestamp range
- key-range query with lower bound with timestamp range
- key-range query with upper bound with timestamp range
- all-keys (no bound) with timestamp range
- key-range query all-versions
- key-range query with lower bound all-versions
- key-range query with upper bond all-versions
- all-keys query (no bound) all-versions (entire store)
...
In this KIP we propose the public classes, VersionedRangeQuery MultiVersionedRangeQuery that will be described in the next section. . More over a method will be added to the VersionedKeyValueStore interface.
Proposed Changes
For supporting range queries, MultiVersionedRangeQuery VersionedRangeQuery class is used.
- The methods are composable. Therefore, the meaningless combinations such as withRange(k1, k2).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException). The fromTime and toTime methods specify the time range while the withLowerKeyBound and withUpperKeyBound methods specify the key bounds.
If a user applies the same time limit multiple times such as MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t1).from(t2), then the last one wins (it will be translated to MultiVersionedRangeQuery.withLowerKeyBound(k1).from(t2)).
- Defining a query with time range (empty, t1] will be translated into
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be interpreted as a normal range query that returns the records with the latest timestamptranslated into [0, MAX). It means that the query will return all the versions of all the records with specified key range.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestampfromTime specifies the starting point. There can be records which have been inserted before the fromTimestampfromTime and are valid in the time range.
- The asOfTimestamptoTime specifies the ending point. Records that have been inserted at asOfTimestamptoTime are returned by the query as well.
- The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
- The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
- No ordering is guaranteed for the return records.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Code Block | ||||||
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTimestampfromTime; private final Optional<Instant> asOfTimestamptoTime; private finalMultiVersionedRangeQuery( boolean isKeyAscending; private final booleanOptional<K> isTimeAscending;lower, private final boolean isOrderedByKey; private VersionedRangeQuery( final Optional<K> lowerupper, final Optional<K>Optional<Instant> upperfromTime, final Optional<Instant> fromTimestamp,toTime) { this.lower final Optional<Instant> asOfTimestamp,= lower; this.upper final boolean isOrderedByKey,= upper; this.fromTime = finalfromTime; boolean isKeyAscending, this.toTime = toTime; } final boolean isTimeAscending)/** { this.lower = lower; this.upper = upper; this.fromTimestamp = fromTimestamp; this.asOfTimestamp = asOfTimestamp; this.isOrderedByKey = isOrderedByKey; this.isKeyAscending = isKeyAscending; this.isTimeAscending = isTimeAscending; } /** * Interactive range query using a lower and upper bound to filter the keys returned. * For each * key the records valid within the specified time range are returned. * In case the time range is * not specified just the latest record for each key is returned.* Interactive range query using a lower and upper bound to filter the keys returned. * For each * key the records valid within the specified time range are returned. * In case the time range is * not specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param lower<V> The keyvalue thattype specifies the lower bound of the range */ *public @param upper The key that specifies the upper bound of the rangestatic <K, V> MultiVersionedRangeQuery<K, V> withKeyRange(final K lower, final K upper); /** * @paramInteractive <K>range query using Thea keylower type bound to filter *the @paramkeys <V>returned. * For Theeach valuekey typethe */ public static <K, V> VersionedRangeQuery<K, V> withRange(final K lower, final K upper); /** records valid within the specified time range are returned. * In case the time range is not * Interactivespecified rangejust querythe usinglatest arecord lowerfor boundeach tokey filteris thereturned. keys returned. * @param Forlower eachThe key the that specifies the *lower recordsbound valid withinof the specified time range are returned. * In@param case<K> the time rangeThe iskey nottype * specified@param just<V> the latest recordThe forvalue eachtype key is returned.*/ public *static @param<K, lowerV> TheMultiVersionedRangeQuery<K, keyV> that specifies thewithLowerKeyBound(final K lower); bound of the range /** * @paramInteractive <K>range query using Thea keylower type bound to filter *the @paramkeys <V>returned. * For Theeach valuekey typethe */ records publicvalid staticwithin <K,the V>specified VersionedRangeQuery<K,time V>range withLowerBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * Forare returned. * In case the time range is not * specified just the latest record for each key theis returned. * records valid within @param upper The key that specifies the specified timelower bound of the range are returned. * In@param case<K> the time rangeThe iskey nottype * specified@param just<V> the latest recordThe forvalue eachtype key is returned.*/ public *static @param<K, upperV> TheMultiVersionedRangeQuery<K, keyV> thatwithUpperKeyBound(final specifies the lower bound of the range * @param <K> The key type * @param <V> The value typeK upper); /** * Interactive scan query that returns all records in the store. * For each key the records valid */ within publicthe staticspecified <K,time V>range VersionedRangeQuery<K,are V> withUpperBound(final K upper); /** * Interactive scan query that returns all records in the store. * Forreturned. * In case the time range is not specified just * the latest record for each key the records validis returned. * @param within<K> theThe specifiedkey timetype range are returned. * In@param case<V> theThe time range is not specified justvalue type */ public *static the<K, latestV> recordMultiVersionedRangeQuery<K, for each key is returned.V> allKeys(); /** * @paramSpecifies <K>the Thestarting keytime type point for the *key @paramquery. <V>The Therange valuequery type returns all the */records public static* <K,that V>are VersionedRangeQuery<K,valid V> withNoBounds(); /** * Specifiesin the time range starting from the startingtimestamp time point for the key query.{@code fromTime}. * @param fromTime The rangestarting querytime returnspoint all the records*/ public *MultiVersionedRangeQuery<K, thatV> are valid in the time range starting from the timestamp {@code fromTimestampfromTime(Instant fromTime); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code toTime}. * @param fromTimestamptoTime The startingending time point */ public VersionedRangeQuery<KMultiVersionedRangeQuery<K, V> fromtoTime(Instant fromTimestamptoTime); /** * SpecifiesThe thelower endingbound timeof pointthe forquery, theif keyspecified. query. The range*/ query returnspublic all the records thatOptional<K> lowerKeyBound(); /** * have timestamp <= {@code asOfTimestamp}.The upper bound of the query, if specified * @param asOfTimestamp/ public Optional<K> upperKeyBound(); /** * The endingstarting time point of the query, if specified */ public VersionedRangeQuery<K, V> asOf(Instant asOfTimestampOptional<Instant> fromTime(); /** * SpecifiesThe theending startingtime andpoint ending points of the range query, as MIN and MAX respectively.if specified */ Therefore, thepublic query returns all the existing records in the state store with keys within the * specified key range. * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already * Optional<Instant> toTime(); } |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Imagine we have the following records
put(1, 1, time=2023-01-01T10:00:00.00Z)
put(1, null, time=2023-01-05T10:00:00.00Z)
put(2, 20, time=2023-01-10T10:00:00.00Z)
put(3, 30, time=2023-01-12T10:00:00.00Z)
put(1, 2, time=2023-01-15T10:00:00.00Z)
put(1, 3, time=2023-01-20T10:00:00.00Z)
put(2, 30, time=2023-01-25T10:00:00.00Z)
Code Block | ||||
---|---|---|---|---|
| ||||
// example 1: MultiVersionedRangeQuery without specifying any time bound will be interpreted as all versions final MultiVersionedRangeQuery<Integer, Integer> query1 = MultiVersionedRangeQuery.withKeyRange(1, 2); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request1 = inStore("my_store").withQuery(query1); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult1 = kafkaStreams.query(request1); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; specified. */ publicInteger VersionedRangeQuery<K,value V> allVersions= record.value.value(); /** * Specifies the overall order ofLong returnedtimestamp records by timestamp */ public VersionedRangeQuery<K, V> orderByTimestamp= record.value.timestamp(); Long validTo = record.value.validTo(); /** * Specifies the order of keys as descending. */ public VersionedRangeQuery<K, V> withDescendingKeys(); /** * Specifies the order of the timestamps as descending. */ public VersionedRangeQuery<K, V> withDescendingTimestamps(); /** * The lower bound of the query, if specified. */ public Optional<K> getLowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> getUpperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> getFromTimestamp(); /** * The ending time point of the query, if specified */ public Optional<Instant> getAsOfTimestamp(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isRangeAscending(); } |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||
---|---|---|---|---|
| ||||
final VersionedRangeQuery<Integer, Integer> query = VersionedRangeQuery.withKeyRange(1, 2).from(Instant.parse(2023-08-03T10:37:30.00Z)).asOf(Instant.parse(2023-09-04T10:37:30.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } }System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key,value: 1,1, timestamp: 2023-01-01T10:00:00.00Z, valid till: 2023-01-05T10:00:00.00Z key,value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key,value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z key,value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now */ // example 2: The value of the records with key range (1,2) from 2023-01-17 Time: 10:00:00.00Z till 2023-01-30 T10:00:00.00Z MultiVersionedRangeQuery<Integer, Integer> query2 = MultiVersionedRangeQuery.withKeyRange(1, 2); query2 = query2.fromTime(Instant.parse(2023-01-17T10:00:00.00Z)).toTime(Instant.parse(2023-01-30T10:00:00.00Z)); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request2 = inStore("my_store").withQuery(query2); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult2 = kafkaStreams.query(request2); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults2 = versionedRangeResult2.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); Long validTo = record.value.validTo(); System.out.println ("key,value: " + key + "," +value + ", timestamp: " + Instant.ofEpochSecond(timestamp)+ ", valid till: " + Instant.ofEpochSecond(validTo)); } } } /* the printed output will be key, value: 2,30, timestamp: 2023-01-25T10:00:00.00Z, valid till: now key, value: 1,3, timestamp: 2023-01-20T10:00:00.00Z, valid till: now key, value: 1,2, timestamp: 2023-01-15T10:00:00.00Z, valid till: 2023-01-20T10:00:00.00Z key, value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till: 2023-01-25T10:00:00.00Z */ |
Compatibility, Deprecation, and Migration Plan
- Since this is a completely new set of APIs, no backward compatibility concerns are anticipated.
- Since nothing is deprecated in this KIP, no backward compatibility concerns are anticipated. Since nothing is deprecated in this KIP, users have no need to migrate unless they want tousers have no need to migrate unless they want to.
Test Plan
The range interactive queries will be tested in versioned stored IQv2 integration test (like non-versioned range queries). Moreover , there will be unit tests where ever needed.
Rejected Alternatives
The initial plan was to provide ordering based on key and/or timestamp, which is removed from the KIP and may be provided by subsequent KIPs based on user demand.