...
Code Block | ||||
---|---|---|---|---|
| ||||
builder.table(
"my_topic",
Consumed.with(Serdes.Integer(), Serdes.Integer()),
Materialized.as(Stores.persistentVersionedKeyValueStore(
"my_store",
Duration.ofMinutes(10)
))
);
final VersionedKeyQuery<Integer, Integer> query = KeyQuery.withKey(1).asOf(Instant.parse("2023-08-03T10:37:30.00Z");
final StateQueryRequest<ValueIterator<VersionedRecord<Integer>>> request =
inStore("my_store").withQuery(query);
final StateQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(request); |
...