(Working in Progress)
Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.
There append three new exceptions:
// public class StateStoreClosedException extends InvalidStateStoreException public class StateStoreMigratedException extends InvalidStateStoreException public class StateStoreRetryableException extends InvalidStateStoreException public class StateStoreFailException extends InvalidStateStoreException |
Three categories exception throw to user
StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.
The following is the public method that users will call:
We need check KafkaStreams status by following rule when any state store exception thrown during user call all of above methods
|
|
|
|
|
|
|
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { validateIsRunning(); return queryableStoreProvider.getStore(storeName, queryableStoreType); } |
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) { final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); if (!globalStore.isEmpty()) { return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName); } final List<T> allStores = new ArrayList<>(); for (StateStoreProvider storeProvider : storeProviders) { allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); } if (allStores.isEmpty()) { throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } return queryableStoreType.create( new WrappingStoreProvider(storeProviders), storeName); } |
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { final StateStore store = globalStateStores.get(storeName); if (store == null || !queryableStoreType.accepts(store)) { return Collections.emptyList(); } if (!store.isOpen()) { throw new InvalidStateStoreException("the state store, " + storeName + ", is not open."); } return (List<T>) Collections.singletonList(store); } |
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } if (!streamThread.isRunningAndNotRebalancing()) { throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + streamThread.state() + ", not RUNNING"); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask + " because the store is not open. The state store may have migrated to another instances."); } stores.add((T) store); } } return stores; } |
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) { final List<T> allStores = new ArrayList<>(); for (StateStoreProvider provider : storeProviders) { final List<T> stores = provider.stores(storeName, type); allStores.addAll(stores); } if (allStores.isEmpty()) { throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } return allStores; } |
public V get(final K key) { Objects.requireNonNull(key); final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType); for (ReadOnlyKeyValueStore<K, V> store : stores) { try { final V result = store.get(key); if (result != null) { return result; } } catch (InvalidStateStoreException e) { throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } } return null; } public KeyValueIterator<K, V> range(final K from, final K to) { Objects.requireNonNull(from); Objects.requireNonNull(to); final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() { @Override public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) { try { return store.range(from, to); } catch (InvalidStateStoreException e) { throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } } }; final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType); return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); } public KeyValueIterator<K, V> all() { final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() { @Override public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) { try { return store.all(); } catch (InvalidStateStoreException e) { throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } } }; final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType); return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); } public long approximateNumEntries() { final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType); long total = 0; for (ReadOnlyKeyValueStore<K, V> store : stores) { total += store.approximateNumEntries(); if (total < 0) { return Long.MAX_VALUE; } } return total; } |
void validateStoreOpen() { if (!innerState.isOpen()) { throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."); } } |
private void validateStoreOpen() { if (!open) { throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); } } |
public synchronized boolean hasNext() { if (!open) { throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName)); } return iter.isValid(); } |
public synchronized boolean hasNext() { if (!open) { throw new InvalidStateStoreException(String.format("Store %s has closed", storeName)); } if (next != null) { return true; } if (!underlying.hasNext()) { return false; } next = underlying.next(); return true; } |