...
For supporting range queries, VersionedRangeQuery class is used.
- The methods are composable. Therefore, the meaningless combinations such as withRange(k1, k2).asOf(t1).allVersions() end up throwing a RunTimeException (for example NotSupportedException).
- Defining a query with time range (empty, t1] will be translated into [0, t1]
- Defining a query with time range (t1, empty) will be translated into [t1, MAX)
- A query with no specified time range will be interpreted as a normal range query that returns the records with the latest timestamp.
- As explained in the javadocs, the query returns all valid records within the specified time range.
- The fromTimestamp specifies the starting point. There can be records which have been inserted before the fromTimestamp and are valid in the time range.
- The asOfTimestamp specifies the ending point. Records that have been inserted at asOfTimestamp are returned by the query as well.
- The overall order of the returned records is by Key. The method orderByTimestamp() can make the overall order by timestamp.
- The order for both key and timestamp is by default ascending. They can be changed by the methods withDescendingKeys() and withDescendingTimestamps() respectively.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a set of records with keys within a specified key range and time * range. */ @Evolving public final class VersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Instant> fromTimestamp; private final Optional<Instant> asOfTimestamp; private final boolean isKeyAscending; private final boolean isTimeAscending; private final boolean isOrderedByKey; private VersionedRangeQuery( final Optional<K> lower, final Optional<K> upper, final Optional<Instant> fromTimestamp, final Optional<Instant> asOfTimestamp, final boolean isOrderedByKey, final boolean isKeyAscending, final boolean isTimeAscending) { this.lower = lower; this.upper = upper; this.fromTimestamp = fromTimestamp; this.asOfTimestamp = asOfTimestamp; this.isOrderedByKey = isOrderedByKey; this.isKeyAscending = isKeyAscending; this.isTimeAscending = isTimeAscending; } /** * Interactive range query using a lower and upper bound to filter the keys returned. * For each * key the records valid within the specified time range are returned. * In case the time range is * not specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withRange(final K lower, final K upper); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withLowerBound(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * For each key the * records valid within the specified time range are returned. * In case the time range is not * specified just the latest record for each key is returned. * @param upper The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withUpperBound(final K upper); /** * Interactive scan query that returns all records in the store. * For each key the records valid * within the specified time range are returned. * In case the time range is not specified just * the latest record for each key is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withNoBounds(); /** * Specifies the starting time point for the key query. The range query returns all the records * that are valid in the time range starting from the timestamp {@code fromTimestamp}. * @param fromTimestamp The starting time point */ public VersionedRangeQuery<K, V> from(Instant fromTimestamp); /** * Specifies the ending time point for the key query. The range query returns all the records that * have timestamp <= {@code asOfTimestamp}. * @param asOfTimestamp The ending time point */ public VersionedRangeQuery<K, V> asOf(Instant asOfTimestamp); /** * Specifies the starting and ending points of the range query as MIN and MAX respectively. * Therefore, the query returns all the existing records in the state store with keys within the * specified key range. * @throws RuntimeException if {@code fromTimestamp} or {@code asOfTimestamp} have been already * specified. */ public VersionedRangeQuery<K, V> allVersions(); /** * Specifies the overall order of returned records by timestamp */ public VersionedRangeQuery<K, V> orderByTimestamp(); /** * Specifies the order of keys as descending. */ public VersionedRangeQuery<K, V> withDescendingKeys(); /** * Specifies the order of the timestamps as descending. */ public VersionedRangeQuery<K, V> withDescendingTimestamps(); /** * The lower bound of the query, if specified. */ public Optional<K> getLowerKeyBound(); /** * The upper bound of the query, if specified */ public Optional<K> getUpperKeyBound(); /** * The starting time point of the query, if specified */ public Optional<Instant> getFromTimestamp(); /** * The ending time point of the query, if specified */ public Optional<Instant> getAsOfTimestamp(); /** * @return true if the query orders the returned records by key */ public boolean isOrderedByKey(); /** * @return true if the query returns records in ascending order of keys */ public boolean isKeyAscending(); /** * @return true if the query returns records in ascending order of timestamps */ public boolean isRangeAscending(); } |
...