...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.query; /** * Interactive query for retrieving a single record based on its key. */ @Evolving public final class KeyQuery<K, V> implements Query<V> { private final K key; private final boolean skipCache; private final Optional<Instant> asOfTimestamp; private KeyQuery(final K key, final Optional<Instant> asOfTimestamp, final boolean skipCache) { this.key = Objects.requireNonNull(key); this.asOfTimestamp = asOfTimestamp; this.skipCache = skipCache; } /** * Creates a query that will retrieve the record identified by {@code key} if it exists * (or {@code null} otherwise). * @param key The key to retrieve * @param <K> The type of the key * @param <V> The type of the value that will be retrieved */ public static <K, V> KeyQuery<K, V> withKey(final K key) { return new KeyQuery<>(key, Optional.empty(), false); } /** * Specifies the upper inclusive bound for the key query. The key query returns the record * with the greatest timestamp <= asOfTimestamp * @param asOfTimestamp The upper inclusive bound for timestamp */ public KeyQuery<K, V> asOf(final Instant asOfTimestamp) { return new KeyQuery<>(key, Optional.of(asOfTimestamp), skipCache); } ; /** * Specifies that the cache should be skipped during query evaluation. This means, that the query * will always get forwarded to the underlying store. */ public KeyQuery<K, V> skipCache() { return new KeyQuery<>(key, asOfTimestamp, true); } /** * The key that was specified for this query. */ public K getKey() { return key; } /** * The timestamp of the query, if specified */ public Optional<Instant> getAsOfTimestamp() { return asOfTimestamp; } /** * The flag whether to skip the cache or not during query evaluation. */ public boolean isSkipCache() { return skipCache; } } |
Examples....
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
...