...
Expand | ||
---|---|---|
| ||
|
Changed Classes
Code Block | ||
---|---|---|
| ||
public interface QueryableStoreType<T> {
T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
return queryableStoreProvider.getStore(storeName, queryableStoreType);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
}
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders),
storeName);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final StateStore store = globalStateStores.get(storeName);
if (store == null || !queryableStoreType.accepts(store)) {
return Collections.emptyList();
}
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
}
return (List<T>) Collections.singletonList(store);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
streamThread.state() + ", not RUNNING");
}
final List<T> stores = new ArrayList<>();
for (Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
" because the store is not open. The state store may have migrated to another instances.");
}
stores.add((T) store);
}
}
return stores;
} |
// TODO:
// During user call one of following methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:
// * If state is RUNNING or REBALANCING
// * StateStoreClosedException: should be wrapped to StateStoreRetriableException
// * StateStoreMigratedException: should not be wrapped, directly thrown
// * If state is PENDING_SHUTDOW or ERROR or NOT_RUNNINGwrap InvalidStateStoreException to StateStoreFailException
//
public class KafkaStreams {
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);
}
public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> {
public V get(final K key);
public KeyValueIterator<K, V> range(final K from, final K to);
public KeyValueIterator<K, V> all();
public long approximateNumEntries();
}
public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore<K, V> {
public KeyValueIterator<Windowed<K>, V> fetch(final K key);
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to);
}
public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> {
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo);
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo);
public KeyValueIterator<Windowed<K>, V> all();
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo);
}
class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
public synchronized boolean hasNext();
public synchronized KeyValue<K, V> next();
public KeyValue<K, V> peekNext();
}
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public interface QueryableStoreType<T> {
// TODO: pass stream instance parameter
T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName) | ||||||
Code Block | ||||||
| ||||||
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider provider : storeProviders) {
final List<T> stores =
provider.stores(storeName, type);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> VT getgetStore(final K key) { String storeName, final QueryableStoreType<T> queryableStoreType) { final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); if Objects(!globalStore.requireNonNullisEmpty(key)); { final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType); for (ReadOnlyKeyValueStore<K, V> store : stores) {return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName); } final List<T> allStores = new ArrayList<>(); for (StateStoreProvider storeProvider : trystoreProviders) { final V result = store.get(keyallStores.addAll(storeProvider.stores(storeName, queryableStoreType)); } if (allStores.isEmpty()) { if (result != null) { // TODO: Replace with StateStoreMigratedException throw new return result; }InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } } catch (InvalidStateStoreException e) { return queryableStoreType.create( throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");WrappingStoreProvider(storeProviders), storeName); } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { final StateStore store } } return null; } public KeyValueIterator<K, V> range(final K from, final K to) { Objects.requireNonNull(from); Objects.requireNonNull(to); final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {= globalStateStores.get(storeName); if (store == null || !queryableStoreType.accepts(store)) { return Collections.emptyList(); } if (!store.isOpen()) { // TODO: Replace with StateStoreClosedException @Override throw new InvalidStateStoreException("the state store, " + storeName public+ KeyValueIterator<K", V> apply(final ReadOnlyKeyValueStore<K, V> store) {is not open."); } return (List<T>) Collections.singletonList(store); } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> tryqueryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return storeCollections.rangeemptyList(from, to); } if (!streamThread.isRunningAndNotRebalancing()) { } catch (InvalidStateStoreException e) { // TODO: Replace with StateStoreMigratedException throw new InvalidStateStoreException("State storeCannot get state store " + storeName + " because the stream thread is not" available+ anymore and may have been migrated to another instance; please re-discover its location from the streamThread.state metadata.() + ", not RUNNING"); } final List<T> stores }= new ArrayList<>(); for (Task streamTask : }streamThread.tasks().values()) { }; final List<ReadOnlyKeyValueStore<K, V>> storesStateStore store = storeProviderstreamTask.storesgetStore(storeName, storeType); return newif DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); } public KeyValueIterator<K, V> all()(store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() { // TODO: Replace with StateStoreClosedException @Override public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K,throw V> store) { try {new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask + return store.all(); " because the store is }not catchopen. (InvalidStateStoreExceptionThe e)state { store may have migrated to another instances."); throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } } } } stores.add((T) store); } } return stores; } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) { final List<T> allStores = new ArrayList<>(); final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType); for (StateStoreProvider provider : storeProviders) { return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); } public long approximateNumEntries() { final List<T> stores = final List<ReadOnlyKeyValueStore<K, V>> stores = storeProviderprovider.stores(storeName, storeTypetype); long total = 0; for (ReadOnlyKeyValueStore<K, V> store : stores) {allStores.addAll(stores); } total += store.approximateNumEntries();if (allStores.isEmpty()) { if (total < 0) {// Replace with StateStoreMigratedException throw new InvalidStateStoreException("the return Long.MAX_VALUE; }state store, " + storeName + ", may have migrated to another instance."); } return totalallStores; } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
void validateStoreOpen() { if (!innerState.isOpen()) {)) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."); } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private void validateStoreOpen() { if (!open) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public synchronized boolean hasNext() {
if (!open) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
}
return iter.isValid();
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public synchronized boolean hasNext() {
if (!open) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException(String.format("Store %s has closed", storeName));
}
if (next != null) {
return true;
}
if (!underlying.hasNext()) {
return false;
}
next = underlying.next();
return true;
} |
...