...
We propose adding an additional method to the KafkaStreams public API:
Code Block |
---|
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) |
...
Code Block |
---|
public interface QueryableStoreType<T> { /** * Called when searching for {@StateStore}s to see if they * match the type expected by implementors of this interface * @param stateStore The stateStore * @return true if it is a match */ boolean accepts(final StateStore stateStore); /** * Create an instance of T (usually a facade) that developers can use * to query the Underlying {@StateStore}s * @param storeProvider provides access to all the underlying StateStore instances of type T * @param storeName The name of the Store * @return T usually a read-only interface over a StateStore @see {@link QueryableStoreTypes.KeyValueStoreType} */ T create(final UnderlyingStoreProvider<T> storeProvider, final String storeName); } /** * Provides access to {@link org.apache.kafka.streams.processor.StateStore}s as * defined by {@link QueryableStoreType} * @param <T> */ public class UnderlyingStoreProvider<T> { /** * get all the StateStores with the give name */ public List<T> getStores(final String storeName); } |
...
A class that provides implementations of the QueryableStoreTypes that are part of KafkaStreams,i.e.,
Code Block |
---|
public class QueryableStoreTypes { public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() { return new KeyValueStoreType<>(); } public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() { return new WindowStoreType<>(); } public static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> { private final Class matchTo; public QueryableStoreTypeMatcher(Class matchTo) { this.matchTo = matchTo; } @SuppressWarnings("unchecked") @Override public boolean accepts(final StateStore stateStore) { return matchTo.isAssignableFrom(stateStore.getClass()); } } private static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> { KeyValueStoreType() { super(ReadOnlyKeyValueStore.class); } @Override public ReadOnlyKeyValueStore<K, V> create( final UnderlyingStoreProvider<ReadOnlyKeyValueStore<K, V>> storeProvider, final String storeName) { return new CompositeReadOnlyStore<>(storeProvider, storeName); } } private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,V>> { WindowStoreType() { super(ReadOnlyWindowStore.class); } @Override public ReadOnlyWindowStore<K, V> create( final UnderlyingStoreProvider<ReadOnlyWindowStore<K, V>> storeProvider, final String storeName) { return new CompositeReadOnlyWindowStore<>(storeProvider, storeName); } } } |
...
Two new interfaces to restrict StateStore access to Read Only (note this only applies to implementations that are part of Kafka Streams)
Code Block |
---|
/* A window store that only supports read operations * * @param <K> Type of keys * @param <V> Type of values */ public interface ReadOnlyWindowStore<K, V> { /** * Get all the key-value pairs with the given key and the time range from all * the existing windows. * * @return an iterator over key-value pairs {@code <timestamp, value>} */ WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); } /** * A key value store that only supports read operations * @param <K> the key type * @param <V> the value type */ public interface ReadOnlyKeyValueStore<K, V> { /** * Get the value corresponding to this key * * @param key The key to fetch * @return The value or null if no value is found. * @throws NullPointerException If null is used for key. */ V get(K key); /** * Get an iterator over a given range of keys. * This iterator MUST be closed after use. * * @param from The first key that could be in the range * @param to The last key that could be in the range * @return The iterator for this range. * @throws NullPointerException If null is used for from or to. */ KeyValueIterator<K, V> range(K from, K to); /** * Return an iterator over all keys in the database. * This iterator MUST be closed after use. * * @return An iterator of all key/value pairs in the store. */ KeyValueIterator<K, V> all(); } |
...
We can then use the above API to get access to the stores like so:
Code Block |
---|
final ReadOnlyKeyValueStore<String, Long> myCount = kafkaStreams.getStore("my-count", QueryableStoreTypes.<String, Long>keyValueStore()); final ReadOnlyWindowStore<String, String> joinOther = kafkaStreams.getStore("join-other", QueryableStoreTypes.<String, String>windowStore()); |
...
Discovery API
Exposed APIs from Kafka Streams:
...