Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here
JIRA: KAFKA-3909
Released: 0.10.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
public interface QueryableStoreType<T> {
/**
* Called when searching for {@StateStore}s to see if they
* match the type expected by implementors of this interface
* @param stateStore The stateStore
* @return true if it is a match
*/
boolean accepts(final StateStore stateStore);
/**
* Create an instance of T (usually a facade) that developers can use
* to query the Underlying {@StateStore}s
* @param storeProvider provides access to all the underlying StateStore instances of type T
* @param storeName The name of the Store
* @return T usually a read-only interface over a StateStore @see {@link QueryableStoreTypes.KeyValueStoreType}
*/
T create(final StateStoreProvider storeProvider, final String storeName);
}
public interface StateStoreProvider {
/**
* Find instances of StateStore that are accepted by {@link QueryableStoreType#accepts} and
* have the provided storeName. |
A class that provides implementations of the QueryableStoreTypes that are part of KafkaStreams,i.e.,
Two new interfaces to restrict StateStore access to Read Only (note this only applies to implementations that are part of Kafka Streams)
Code Block |
---|
/* A window store that only supports read operations * * @param <K> Type of keys * @param <V> Type of values */ public interface ReadOnlyWindowStore<K, V> { /** * Get all the key-value *pairs @paramwith storeNamethe given key and the time range from all * namethe of the storeexisting windows. * @param queryableStoreType * @return filteran stores based on this queryableStoreTypeiterator over key-value pairs {@code <timestamp, value>} */ * @param <T>WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo); } /** * A key value store that only supports read operations * @param <K> the Thekey type * of@param <V> the Store value type */ public interface ReadOnlyKeyValueStore<K, V> { /** @return List of* Get the instancesvalue of the store incorresponding to this topology.key Empty List if not found* */ @param key The <T>key List<T> stores(String storeName, QueryableStoreType<T> queryableStoreType); } |
A class that provides implementations of the QueryableStoreTypes that are part of KafkaStreams,i.e.,
Code Block |
---|
public class QueryableStoreTypes { public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {to fetch * @return The value or null if no value is found. * @throws NullPointerException returnIf new KeyValueStoreType<>(); }null is used for key. public static*/ <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {V get(K key); /** * Get an iterator returnover a new WindowStoreType<>(); }given range of keys. public static* abstractThis classiterator QueryableStoreTypeMatcher<T>MUST implementsbe QueryableStoreType<T> {closed after use. * private * final@param Classfrom matchTo; The first key that could be in public QueryableStoreTypeMatcher(Class matchTo) { the range * @param to The last key that could be this.matchToin =the matchTo;range * @return The } iterator for this range. @SuppressWarnings("unchecked") @Override public boolean accepts(final StateStore stateStore) {* @throws NullPointerException If null is used for from or to. */ KeyValueIterator<K, V> range(K from, return matchTo.isAssignableFrom(stateStore.getClass())K to); /** } * Return an } iterator over all privatekeys staticin class KeyValueStoreType<K, V> extendsthe database. * This iterator MUST be closed after use. * * @return An iterator of all key/value pairs in the store. */ KeyValueIterator<K, V> QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {all(); } |
We can then use the above API to get access to the stores like so:
Code Block |
---|
final ReadOnlyKeyValueStore<String, Long> myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, KeyValueStoreTypeLong>keyValueStore() {); final ReadOnlyWindowStore<String, String> joinOther = kafkaStreams.store("join-other", QueryableStoreTypes.<String, String>windowStore()); |
Discovery API
Exposed APIs from Kafka Streams:
Code Block |
---|
/** * A new config will be added to StreamsConfig * A user defined endpoint that can be used to connect to remote KafkaStreams instances. * Should be in the format host:port */ public static final String APPLICATION_SERVER_CONFIG = "application.server"; public class HostInfo { String hostname; /* hostname for instance that contains state store */ int port; super(ReadOnlyKeyValueStore.class); } @Override public ReadOnlyKeyValueStore<K, V> create( final UnderlyingStoreProvider<ReadOnlyKeyValueStore<K, V>> storeProvider, final String storeName) { return new CompositeReadOnlyStore<>(storeProvider, storeName); /* listening port for instance that } contains state store */ } public class StreamsMetadata private{ static class WindowStoreType<K, V>private extendsfinal QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,V>> { WindowStoreType() { super(ReadOnlyWindowStore.class);HostInfo hostInfo; /* hostInfo from above */ private final Set<String> stateStores; /* state stores on this instance */ private final Set<TopicPartition> } topicPartitions; /* TopicPartitions on this @Overrideinstance */ } /** * Find all of the instances publicof ReadOnlyWindowStore<K, V> create( final UnderlyingStoreProvider<ReadOnlyWindowStore<K, V>> storeProvider, final String storeName) { return new CompositeReadOnlyWindowStore<>(storeProvider, storeName); } } } |
Two new interfaces to restrict StateStore access to Read Only (note this only applies to implementations that are part of Kafka Streams)
Code Block |
---|
/* A window store that only supports read operations
*
* @param <K> Type of keys
* @param <V> Type of values
*/
public interface ReadOnlyWindowStore<K, V> {
/**
* Get all the key-value pairs with the given key and the time range from all
* the existing windows.
*
* @return an iterator over key-value pairs {@code <timestamp, value>}
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
}
/**
* A key value store that only supports read operations
* @param <K> the key type
* @param <V> the value type
*/
public interface ReadOnlyKeyValueStore<K, V> {
/**
* Get the value corresponding to this key
*
* @param key The key to fetch
* @return The value or null if no value is found.
* @throws NullPointerException If null is used for key.
*/
V get(K key);
/**
* Get an iterator over a given range of keys.
* This iterator MUST be closed after use.
*
* @param from The first key that could be in the range
* @param to The last key that could be in the range
* @return The iterator for this range.
* @throws NullPointerException If null is used for from or to.
*/
KeyValueIterator<K, V> range(K from, K to);
/**
* Return an iterator over all keys in the database.
* This iterator MUST be closed after use.
*
* @return An iterator of all key/value pairs in the store.
*/
KeyValueIterator<K, V> all();
} |
We can then use the above API to get access to the stores like so:
Code Block |
---|
final ReadOnlyKeyValueStore<String, Long>
myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, String>
joinOther =
kafkaStreams.store("join-other", QueryableStoreTypes.<String, String>windowStore()); |
Discovery API
Exposed APIs from Kafka Streams:
Code Block |
---|
/** * A new config will be added to StreamsConfig * A user defined endpoint that can be used to connect to remote KafkaStreams instances. * Should be in the format host:port */ public static final String APPLICATION_SERVER_CONFIG = "application.server"; public class HostInfo { String hostname; /* hostname for instance that contains state store */ int port;{@link StreamsMetadata} in a {@link KafkaStreams application} * Note: this is a point in time view and it may change due to partition reassignment. * @return collection containing all instances of {@link StreamsMetadata} in this application */ public Collection<StreamsMetadata> allMetadata(); /** * Find the instances {@link StreamsMetadata} for a given storeName * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName the storeName to find metadata for * @return A collection containing instances of {@link StreamsMetadata} that have the provided storeName */ public Collection<StreamsMetadata> allMetadataForStore(final String storeName); /** * Find the {@link StreamsMetadata} for a given storeName and key. * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which host it would exist on. * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName Name of the store * @param key Key to use to for partition * @param keySerializer Serializer for the key * @param <K> key type * @return The {@link StreamsMetadata} for the storeName and key */ public <K> StreamsMetadata metadataForKey(final String storeName, final K key, final Serializer<K> keySerializer); /** * Find the {@link StreamsMetadata} for a given storeName and key. * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which host it would exist on. * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName Name of the store * @param key Key to use to for partition * @param partitioner Partitioner for the store * @param <K> key type * @return The {@link StreamsMetadata} for the storeName and key */ public <K> StreamsMetadata metadataForKey(final String storeName, /* listening port for instance that contains state store */ } public class StreamsMetadata { final privateK finalkey, HostInfo hostInfo; /* hostInfo from above */ private final Set<String> stateStores; /* state stores on this instance */ private final Set<TopicPartition> topicPartitions; /* TopicPartitions on this instance */ } /** * Find all of the instances of {@link StreamsMetadata} in a {@link KafkaStreams application} * Note: thisfinal isStreamPartitioner<K, a point in time view and it may change due to partition reassignment. * @return collection containing all instances of {@link StreamsMetadata} in this application */ public Collection<StreamsMetadata> allMetadata(); /** * Find the instances {@link StreamsMetadata} for a given storeName * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName the storeName to find metadata for * @return A collection containing instances of {@link StreamsMetadata} that have the provided storeName */ public Collection<StreamsMetadata> allMetadataForStore(final String storeName); /** * Find the {@link StreamsMetadata} for a given storeName and key. * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which host it would exist on. * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName Name of the store * @param key Key to use to for partition * @param keySerializer Serializer for the key * @param <K> key type * @return The {@link StreamsMetadata} for the storeName and key */ public <K> StreamsMetadata metadataForKey(final String storeName, final K key, final Serializer<K> keySerializer); /** * Find the {@link StreamsMetadata} for a given storeName and key. * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which host it would exist on. * Note: this is a point in time view and it may change due to partition reassignment. * @param storeName Name of the store * @param key Key to use to for partition * @param partitioner Partitioner for the store * @param <K>?> partitioner); |
Below is an example of how a developer might use the Discover API
Code Block |
---|
public static void main(String[] args) throws Exception { Properties streamsConfiguration = new Properties(); ... /** * To use the Discovery API the developer must provide an host:port pair that * maps to an embedded service listening on this address. i.e., * it is up to the developer to define the protocol and create the service */ streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:7070"); ... KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); /** * Start your embedded service listening on the endpoint * provided above */ new QueryableStateProxy(streams).start(7070); } /** * Example Rest Proxy that uses the Discovery API to locate the * KafkaStreamsInstances StateStores are running on. A developer would first connect * to a well-known instance to find where a particular store, or store with key, * is located. They'd then use the returned KafkaStreamsInstances * to connect to the appropriate instances and perform queries, i.e, : * KafkaStreamsInstance instance = http.get("http://well-known-host:8080/state/instance/my-store/my-key"); * Long result = http.get("http://" + instance.host() + ":" + instance.port() + "/state/stores/my-store/my-key"); */ @Path("state") public class QueryableStateProxy { /** * The KafkaStreams instance knows about all of the other instances * in the application. This is maintained by the StreamPartitionAssignor * on partition assignments (rebalances) */ private final KafkaStreams streams; public QueryableStateProxy(final KafkaStreams streams) { this.streams = streams; } @GET() @Path("/instances") public Response streamsMetadata() { // get the current collection of StreamsMetadata final Collection<StreamsMetadata> metadata = streams.allMetadata(); return respondWithMetadata(metadata); } @GET() @Path("/instances/{storeName}") public Response streamsMetadataForStore(@PathParam("storeName") String store) { // find all the metadata that have the provided store final Collection<StreamsMetadata> metadata = streams.allMetadataForStore(store); return respondWithMetadata(metadata); } @GET() @Path("/instance/{storeName}/{key}") public Response streamsMetadataForStoreAndKey(@PathParam("storeName") String store, key type * @return The {@link StreamsMetadata} for the storeName and key */ public <K> StreamsMetadata metadataForKey(final @PathParam("key") String storeName, key) { // locate the instance that would have the store with the provided key (if the key exists) final StreamsMetadata metadata = streams.metadataForKey(store, key, new StringSerializer()); if (instance == null) { return final K key, Response.noContent().build(); } return Response.ok(metadata.toString()).build(); } @GET() @Path("/stores/{storeName}/{key}") public Response byKey(@PathParam("storeName") String storeName, @PathParam("id") String key) { // Get a handle on the Store for the provides storeName final StreamPartitioner<KReadOnlyKeyValueStore<String, ?> partitioner); |
Below is an example of how a developer might use the Discover API
Long> store = streams.store(storeName,
QueryableStoreTypes.keyValueStore());
// store may not exist or might not exist yet, i.e, if partitions haven't been assigned or
// a rebalance is in process
if (store == null) {
return Response.noContent().build();
}
// we have a store so we can get the result
final Long result = store.get(key);
if (result == null) {
return Response.noContent().build();
}
return Response.ok(result).build();
}
public void start(int port) {
// start your favourite http server
...
}
}
|
Developer Guide for Custom Stores
If you want to make a customized store Queryable you'll need to implement the QueryableStoreType interface. Below are example implementations for KeyValueStore and WindowStore. You may also want to provide an interface to restrict operations to read-only and a Composite type for providing a faćade over the potentially many instances of the underlying store (see example CompositeReadOnlyKeyValueStore below).
Code Block |
---|
public class QueryableStoreTypes {
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
return new KeyValueStoreType<>();
}
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {
return new WindowStoreType<>();
}
public static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> {
private final Class matchTo;
public QueryableStoreTypeMatcher(Class matchTo) {
this.matchTo = matchTo;
}
@SuppressWarnings("unchecked")
@Override
public boolean accepts(final StateStore stateStore) {
return matchTo.isAssignableFrom(stateStore.getClass());
}
}
private static class KeyValueStoreType<K, V> extends
QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
KeyValueStoreType() {
super(ReadOnlyKeyValueStore.class);
}
@Override
public ReadOnlyKeyValueStore<K, V> create(
final UnderlyingStoreProvider<ReadOnlyKeyValueStore<K, V>> storeProvider,
final String storeName) {
return new CompositeReadOnlyStore<>(storeProvider, storeName);
}
}
private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,V>> {
WindowStoreType() {
super(ReadOnlyWindowStore.class);
}
@Override
public ReadOnlyWindowStore<K, V> create(
final UnderlyingStoreProvider<ReadOnlyWindowStore<K, V>> storeProvider,
final String storeName) {
return new CompositeReadOnlyWindowStore<>(storeProvider, storeName);
}
}
}
public interface StateStoreProvider {
/**
* Find instances of StateStore that are accepted by {@link QueryableStoreType#accepts} and
* have the provided storeName.
*
* @param storeName name of the store
* @param queryableStoreType filter stores based on this queryableStoreType
* @param <T> |
Code Block |
public static void main(String[] args) throws Exception { Properties streamsConfiguration = new Properties(); ... /** * To use the Discovery API the developer must provide an host:port pair that * maps to an embedded service listening on this address. i.e., * it is up to the developer to define the protocol and create the service */ streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:7070"); ... KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); /** * Start your embedded service listening on the endpoint * provided above */ new QueryableStateProxy(streams).start(7070); } /** * Example Rest Proxy that uses the Discovery API to locate the * KafkaStreamsInstances StateStores are running on. A developer would first connect * to a well-known instance to find where a particular store, or store with key, * is located. They'd then use the returned KafkaStreamsInstances * to connect to the appropriate instances and perform queries, i.e, : * KafkaStreamsInstance instance = http.get("http://well-known-host:8080/state/instance/my-store/my-key"); * Long result = http.get("http://" + instance.host() + ":" + instance.port() + "/state/stores/my-store/my-key"); */ @Path("state") public class QueryableStateProxy { /** * The KafkaStreams instance knows about all of the other instances * in the application. This is maintained by the StreamPartitionAssignor * on partition assignments (rebalances) */ private final KafkaStreams streams; public QueryableStateProxy(final KafkaStreams streams) { this.streams = streams; } @GET() @Path("/instances") public Response streamsMetadata() { // get the current collection of StreamsMetadata final Collection<StreamsMetadata> metadata = streams.allMetadata(); return respondWithMetadata(metadata); } @GET() @Path("/instances/{storeName}") public Response streamsMetadataForStore(@PathParam("storeName") String store) { // find all the metadata that have the provided store final Collection<StreamsMetadata> metadata = streams.allMetadataForStore(store); return respondWithMetadata(metadata); } @GET() @Path("/instance/{storeName}/{key}") public Response streamsMetadataForStoreAndKey(@PathParam("storeName") String store, The type of the Store * @return List of the instances of the store in this topology. Empty List if @PathParam("key") String key) { // locate the instance that would have the store with the provided key (if the key exists) final StreamsMetadata metadata = streams.metadataForKey(store, key, new StringSerializer()); if (instance == null) { return Response.noContent().build(); } return Response.ok(metadata.toString()).build(); } @GET() @Path("/stores/{storeName}/{key}") public Response byKey(@PathParam("storeName") String storeName, @PathParam("id") String key) { // Get a handle on the Store for the provides storeName final ReadOnlyKeyValueStore<String, Long> store = streams.store(storeName, not found */ <T> List<T> stores(String storeName, QueryableStoreType<T> queryableStoreType); } /** * A wrapper over the underlying {@link ReadOnlyKeyValueStore}s found in a {@link * org.apache.kafka.streams.processor.internals.ProcessorTopology} * * @param <K> key type * @param <V> value type */ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> { private final StateStoreProvider storeProvider; private final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType; private final String storeName; public CompositeReadOnlyKeyValueStore(final StateStoreProvider storeProvider, final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType, final String QueryableStoreTypes.keyValueStore()); storeName) { // store may not exist or might not exist yet, i.e, if partitions haven't been assigned or // a rebalance is in processthis.storeProvider = storeProvider; this.storeType = storeType; this.storeName = storeName; } @Override public V if get(storefinal ==K nullkey) { final List<ReadOnlyKeyValueStore<K, V>> stores return= ResponsestoreProvider.noContent().build(getStores(storeName, storeType); } for // we have a store so we can get the result (ReadOnlyKeyValueStore<K, V> store : stores) { final LongV result = store.get(key); if (result !== null) { return Response.noContent().build()result; } return Response.ok(result).build(); } } public void start(int port) {} // start your favourite httpreturn servernull; } //... } } |
Proposed implementation outline
...