...
Code Block |
---|
language | java |
---|
firstline | 1 |
---|
title | KeyQueryVersionedKeyQuery.java |
---|
|
package org.apache.kafka.streams.query;
/**
* Interactive query for retrieving a single record from a versioned state store based on its key and timestamp.
* <p>
* See KIP-960 for more details.
*/
@Evolving
public final class KeyQuery<KVersionedKeyQuery<K, V> implements Query<V>Query<VersionedRecord<V>> {
private final K key;
private final boolean skipCache;
private final Optional<Instant> asOfTimestamp;
private KeyQueryVersionedKeyQuery(final K key, final Optional<Instant> asOfTimestamp, final boolean skipCache) {
this.key = Objects.requireNonNull(key);
this.asOfTimestamp = asOfTimestamp;
this.skipCache = skipCache;
}
/**
* Creates a query that will retrieve the record from a versioned state store identified by {@code key} if it exists
* (or {@code null} otherwise).
* @param key The key to retrieve
* @param <K> The type of the key
* @param <V> The type of the value that will be retrieved
*/
public static <K, V> KeyQuery<KVersionedKeyQuery<K, V> withKey(final K key);
/**
* Specifies the upper inclusive bound for the key query. The key query returns the record
* with the greatest timestamp <= asOfTimestamp
* @param asOfTimestamp The upper inclusive bound for timestamp
*/
public KeyQuery<KVersionedKeyQuery<K, V> asOf(final Instant asOfTimestamp);
/**
* Specifies that the cache should be skipped during query evaluation. This means, that the query
* will always get forwarded to the underlying store.
*/
public KeyQuery<K, V> skipCache();
/**
* The key that was specified for this query.
*/
public K getKey();
/**
* The timestamp of the query, if specified
*/
public Optional<Instant> getAsOfTimestamp();
/**
* The flag whether to skip the cache or not during query evaluation.
*/
public boolean isSkipCache();
}
} |
Examples....
The following example illustrates the use of the VersionedKeyQuery class to query a versioned state store.
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
builder.table(
"my_topic",
Consumed.with(Serdes.Integer(), Serdes.Integer()),
Materialized.as(Stores.persistentVersionedKeyValueStore(
"my_store",
Duration.ofMillis(HISTORY_RETENTION)
))
);
final KeyQuery<IntegerVersionedKeyQuery<Integer, ValueAndTimestamp<Integer>>VersionedRecord<Integer>> query = KeyQueryVersionedKeyQuery.withKey(1).asOf(Instant.parse("2023-08-03T10:37:30.00Z");
final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>>StateQueryRequest<VersionedRecord<Integer>> request =
inStore("my_store").withQuery(query);
final StateQueryResult<ValueAndTimestamp<Integer>>StateQueryResult<VersionedRecord<Integer>> result = kafkaStreams.query(request); |
...