...
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-15257
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The main goal is supporting interactive queries in presence of versioned state stores (KIP-889) in AK. For this KIP, the following query types are considered to be implemented.
Key Queries:
- single-key latest-value lookup
- single-key lookup with timestamp (upper) bound
- single-key query with timestamp range
- single-key all versions query
Range Queries
- key-range latest-value query
- key-range with lower bound latest-value query
- key-range with upper bound latest-value query
- all-keys (no bound) latest-value query
- key-range query with timestamp (upper) bound
- key-range with lower bound with timestamp (upper) bound
- key-range with upper bound with timestamp (upper) bound
- all-keys (no bound) with timestamp (upper) bound
- key-range query with timestamp range
- key-range query with lower bound with timestamp range
- key-range query with upper bound with timestamp range
- all-keys (no bound) with timestamp range
- key-range query all-versions
- key-range query with lower bound all-versions
- key-range query with upper bond all-versions
- all-keys query (no bound) all-versions (entire store)
Public Interfaces
In this KIP we propose two new public classes: VersionedKeyQuery and VersionedRangeQuery that will be described in the next section. Moreover, the public interface ValueIterator is used to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp).
Proposed Changes
For single-key queries, VersionedKeyQuery and ValueIterator classes will be used.
...
Code Block | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||
package org.apache.kafka.streams.query; @Evolving public final class VersionedRangeQuery<K, V> implements Query<KeyValueIterator<K, VersionedRecord<V>>> { private final Optional<K> lower; private final Optional<K> upper; private final Optional<Long> asOfTimestamp; private final Optional<Long> untilTimestamp; private VersionedRangeQuery( final Optional<K> lower, final Optional<K> upper, final Optional<Long> asOfTimestamp, final Optional<Long> untilTimestamp); /** * Interactive range query using a lower and upper bound to filter the keys returned. * * For each key only the record with the latest timestamp is returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> keyRangeLatestValue(final K lower, final K upper); /** * Interactive range query using a lower bound to filter the keys returned. * * For each key only the record with the latest timestamp is returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withLowerBoundLatestValue(final K lower); /** * Interactive range query using a lower bound to filter the keys returned. * * For each key only the record with the latest timestamp is returned. * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withUpperBoundLatestValue(final K upper); /** * Interactive scan query that returns all records in the store. * * For each key only the record with the latest timestamp is returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withNoBoundLatestValue(); /** * Interactive range query using a lower and upper bound to filter the keys returned. * * For each key all the records with the greatest timestamp <= untilTimeStamp are returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param untilTimeStamp The upperbound for timestamp * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> KeyRangeWithTimestampBound(final K lower, final K upper, final long untilTimeStamp); /** * Interactive range query using a lower bound to filter the keys returned. * * For each key all the records with the greatest timestamp <= untilTimeStamp are returned. * @param lower The key that specifies the lower bound of the range * @param untilTimeStamp The upperbound for timestamp * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withLowerBoundWithTimestampBound(final K lower, final long untilTimeStamp); /** * Interactive range query using an upper and upper bound to filter the keys returned. * * For each key all the records with the greatest timestamp <= untilTimeStamp are returned. * @param upper The key that specifies the upper bound of the range * @param untilTimeStamp The upperbound for timestamp * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withUpperBoundWithTimestampBound(final K upper, final long untilTimeStamp); /** * Interactive scan query that returns all records in the store. * * For each key all the records with the greatest timestamp <= untilTimeStamp are returned. * @param untilTimeStamp The upperbound for timestamp * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withNoBoundWithTimestampBound(final long untilTimeStamp); /** * Interactive range query using a lower and upper bound to filter the keys returned. * * For each key all the records within the specified time range are returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> keyRangeWithTimestampRange(final K lower, final K upper, final long asOfTimestamp, final long untilTimestamp); /** * Interactive range query using a lower bound to filter the keys returned. * * For each key all the records within the specified time range are returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withLowerBoundWithTimestampRange(final K lower, final long asOfTimestamp, final long untilTimestamp); /** * Interactive range query using an upper bound to filter the keys returned. * * For each key all the records within the specified time range are returned. * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withUpperBoundWithTimestampRange(final K upper, final long asOfTimestamp, final long untilTimestamp); /** * Interactive scan query that returns all records in the store. * * For each key all the records within the specified time range are returned. * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withNoBoundWithTimestampRange(final long asOfTimestamp, final long untilTimestamp); /** * Interactive range query using a lower and upper bound to filter the keys returned. * For each key all values from the oldest till the newest record existing in the state store * * are returned * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> keyRangeAllVersions(final K lower, final K upper); /** * Interactive range query using a lower bound to filter the keys returned. * For each key all values from the oldest till the newest record existing in the state store * * are returned * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withLowerBoundAllVersions(final K lower); /** * Interactive range query using an upper bound to filter the keys returned. * For each key all values from the oldest till the newest record existing in the state store * * are returned * @param upper The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withUpperBoundAllVersions(final K upper); /** * Interactive scan query that returns all records in the store. * For each key all values from the oldest till the newest record existing in the state store * * are returned * @param <K> The key type * @param <V> The value type */ public static <K, V> VersionedRangeQuery<K, V> withNoBoundAllVersions(); /** * The lower bound of the query, if specified. */ public Optional<K> getLowerBound(); /** * The upper bound of the query, if specified */ public Optional<K> getUpperBound(); /** * The upper bound for timestamp of the query, if specified */ public Optional<Long> getAsOfTimestamp(); /** * The upper bound for timestamp of the query, if specified */ public Optional<Long> getUntilTimestamp(); } |
Examples
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
final VersionedKeyQuery<Integer, Integer> query = VersionedKeyQuery.withKeyAllVersions(1); final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<ValueIterator<VersionedRecord<Integer>>> versionedKeyResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> partitionResults = versionedKeyResult.getPartitionResults(); for (final Entry<Integer, QueryResult<ValueIterator<VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final ValueIterator<VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final VersionedRecord<Integer> record = iterator.next(); Long timestamp = record.timestamp(); Integer value = record.value(); } } } |
The following example illustrates the use of the VersionedRangeQuery class to query a versioned state store.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
final VersionedRangeQuery<Integer, Integer> query = VersionedRangeQuery.keyRangeWithTimestampRange(1, 2, 1690201149, 1690373949); final StateQueryRequest<KeyValueIterator<Integer, VersionedRecord<Integer>>> request = inStore("my_store").withQuery(query); final StateQueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>> versionedRangeResult = kafkaStreams.query(request); // Get the results from all partitions. final Map<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> partitionResults = versionedRangeResult.getPartitionResults(); for (final Entry<Integer, QueryResult<KeyValueIterator<Integer, VersionedRecord<Integer>>>> entry : partitionResults.entrySet()) { try (final KeyValueIterator<Integer, VersionedRecord<Integer>> iterator = entry.getValue().getResult()) { while (iterator.hasNext()) { final KeyValue<Integer, VersionedRecord<Integer>> record = iterator.next(); Integer key = record.key; Integer value = record.value.value(); Long timestamp = record.value.timestamp(); } } } |
...