...
JIRA: Jiraserver ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-9445
server | ASF JIRA |
---|---|
columns | key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution |
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
key | KAFKA-9445 |
Discussion:
...
https://www.mail-archive.com/dev@kafka.apache.org/msg104287.html
Motivation:
Whenever a call is made to get a particular key from a Kafka Streams instance, currently it returns a store wrapper that contains a list of the stores for all the running and restoring/replica(with KIP-535: Allow state stores to serve stale reads during rebalance) on the instance via StreamThreadStateStoreProvider#stores().
...
Public Interfaces:
- Adding new Class StoreQueryParams.java class StoreQueryParameters to provide user options to the
QueryableStoreProvider
layer layer to understand what kind of stores a user wants. It would currently include whether a user is okay with serving stale data and if user already knows what is the partition of the store a user is looking at. Since store name and partition would be a unique combination, a taskId can be generated from this information to return the store for that particular task.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams; //** * Represents all the query options that a user can provide to state what kind of stores it is expecting. The options would be whether a user would want to enable/disable stale stores* or whether it knows the list of partitions that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name. * It contains a partition, which for a point queries can be populated from the KeyQueryMetadata. */ public class StoreQueryParams<T> { private Integer partition; private boolean includeStaleStores; private final String storeName; private final QueryableStoreType<T> queryableStoreType; private StoreQueryParams(final String storeName, final QueryableStoreType<T> queryableStoreType) { this.storeName = storeName; this.queryableStoreType = queryableStoreType; } public class StoreQueryParameters<T> { public static <T> StoreQueryParams<T>StoreQueryParameters<T> fromNameAndType(final String storeName, final QueryableStoreType<T> queryableStoreType) { return new <T> StoreQueryParams<T>(storeName, queryableStoreType); } /** public * Get the partition to be used to fetch list of Queryable store from QueryableStoreProvider. * If the function returns null, it would mean that no specific partition has been requested so all the local partitions * for the store will be returned. * * @return Integer partition */ public Integer getPartition() { return partition; } /** * Get the flag includeStaleStores. If true, include standbys and recovering stores along with running stores. * * @return boolean includeStaleStores */ public boolean includeStaleStores() { return includeStaleStores; } /** * Get the {@link StoreQueryParams} with stale(standby, restoring) stores added via fetching the stores. * * @param partition The specific integer partition to be fetched from the stores list by using {@link StoreQueryParams}. * * @return String storeName */StoreQueryParameters<T> withPartition(final Integer partition); public StoreQueryParameters<T> enableStaleStores(); public StoreQueryParams<T> withPartition(final Integer partition) { this.partition = partition; return this; } /** * Get the {@link StoreQueryParams} with stale(standby, restoring) stores added via fetching the stores. * * @return String storeName */(); public StoreQueryParams<T>boolean withIncludeStaleStores() { this.includeStaleStores = true; return this; } /** * Get the store name for which key is queried by the user. * * @return String storeName */ staleStoresEnabled(); public String storeName() { return storeName; } /** * Get the queryable store type for which key is queried by the user. * * @return QueryableStoreType queryableStoreType */ public QueryableStoreType<T> queryableStoreType() { return queryableStoreType; } } |
- Changing the
KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean
includeStaleStoresstaleStores)
in in favour of the funtion function mentioned below as this one hasn't been released yet.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class KafkaStreams { @Deprecated public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { return store(StoreQueryParams.fromNameAndType(storeName, queryableStoreType)); } //** remove (was added * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}. * StoreQueryParams need required parameters to be set, which are {@code storeName} and if * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}. * The optional parameters to the StoreQueryParams include {@code partition} and {@code includeStaleStores}. * The returned object can be used to query the {@link StateStore} instances. * * @param storeQueryParams If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withPartition(int partition) is used, it allow queries on the specific partition irrespective if it is a standby * or a restoring replicas in addition to active ones. * If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withIncludeStaleStores() is used, it allow queries on standbys and restoring replicas in addition to active ones for all the local partitions on the instance. * If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withIncludeStaleStores().withPartition(int partition), it allow queries on the specific partition irrespective if it is a standby * or a restoring replicas in addition to active ones.. * By default, if just storeQueryParams is used, it returns all the local partitions for the store which are in running state. * @param <T> return type * @return A facade wrapping the local {@link StateStore} instances * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and * {@code queryableStoreType} doesn't exist */ via KIP-535 and was never released) public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean staleStores); // newly added public <T> T store(final StoreQueryParams<T> storeQueryParams) { validateIsRunningOrRebalancing(); return queryableStoreProvider.getStore(storeQueryParamsStoreQueryParameters<T> storeQueryParameters); } |
Proposed Changes:
- Add a new public class StoreQueryParams.java to class
StoreQueryParameters
to set options for what kind of stores a user wants. - Create a taskId from the combination of store name and partition provided by the user.
- In
StreamThreadStateStoreProvider.java
return return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.
...
- KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean includeStaleStores) will be changed to the one mentioned in the Public Interfaces changes. Since the mentioned function is not released yet in any version, no deprecation is required.
- Deprecating store(final String storeName, final QueryableStoreType<T> queryableStoreType) method in favour of public <T> T store(final StoreQueryParams<T> storeQueryParamsStoreQueryParameters<T> storeQueryParameters) as both store name and queryableStoreType have been added to StoreQueryParamsto StoreQueryParameters.
Rejected Alternatives:
- Overload the QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() with new parameters to pass a list of partitions along with the currently passed flag includeStaleStores.