...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.state;
// new classed and interfaces
public class ValueAndTimestamp<V> {
private ValueAndTimestamp(final V value, final long timestamp); // use `make()` instead
public V value();
public long timestamp();
public static <V> ValueAndTimestamp<V> make(final V value, final long timestamp); // returns `null` if `value==null`
public static <V> V getValueOrNull(final ValueAndTimestamp<V> valueAndTimestamp); // returns `null` if `valueAndTimestamp==null`
}
public interface TimestampedKeyValueStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> {}
public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> {}
public interface TimestampedSessionStore<K, V> extends SessionStore<K, ValueAndTimestamp<V>> {}
public interface TimestampedBytesStore {
static byte[] convertToTimestampedFormat(final byte[] plainValue);
}
// extend existing classes (omitting existing method)
public final class Stores {
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name);
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates);
public static SessionBytesStoreSupplier persistentTimestampedSessionStore(final String name,
final Duration retentionPeriod);
public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde);
public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde;
public static <K, V> StoreBuilder<TimestampedSessionStore<K, V>> timestampedSessionStoreBuilder(final SessionBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde);
}
public final class QueryableStoreTypes {
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore();
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> timestampedWindowStore();
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, ValueAndTimestamp<V>>> timestampedSessionStore();
}
// test-utils package
public class TopologyTestDriver {
public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name);
public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name);
public <K, V> SessionStore<K, ValueAndTimestamp<V>> getTimestampedSessionStore(final String name);
} |
...