Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.
Three categories exception to user
public class StateStoreMigratedException extends InvalidStateStoreException public class StateStoreRetryableException extends InvalidStateStoreException public class StateStoreFailException extends InvalidStateStoreException |
|
|
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { validateIsRunning(); try { return queryableStoreProvider.getStore(storeName, queryableStoreType); } catch (InvalidStateStoreException e) { if (state==State.RUNNING || state==State.REBALANCING) { if (e instanceof StateStoreClosedException) throw new StateStoreRetryableException(e); else // e instanceof StateStoreMigratedException throw e; } else { // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING throw new StateStoreFailException(e); } } } |
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) { final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); if (!globalStore.isEmpty()) { return queryableStoreType.create(streams, new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName); } final List<T> allStores = new ArrayList<>(); for (StateStoreProvider storeProvider : storeProviders) { allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); } if (allStores.isEmpty()) { // throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } return queryableStoreType.create(streams, new WrappingStoreProvider(storeProviders), storeName); } |
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { final StateStore store = globalStateStores.get(storeName); if (store == null || !queryableStoreType.accepts(store)) { return Collections.emptyList(); } if (!store.isOpen()) { // Before: // throw new InvalidStateStoreException("the state store, " + storeName + ", is not open."); throw new StateStoreClosedException("the state store, " + storeName + ", is not open."); } return (List<T>) Collections.singletonList(store); } |
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } if (!streamThread.isRunningAndNotRebalancing()) { // Before: // throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { // Before: // throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } stores.add((T) store); } } return stores; } |
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) { final List<T> allStores = new ArrayList<>(); for (StateStoreProvider provider : storeProviders) { final List<T> stores = provider.stores(storeName, type); allStores.addAll(stores); } if (allStores.isEmpty()) { // Before: throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance."); } return allStores; } |
void validateStoreOpen() { if (!innerState.isOpen()) { // Before: throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."); throw new StateStoreClosedException("Store " + innerState.name() + " is currently closed."); } } |
private void validateStoreOpen() { if (!open) { // Before: throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); throw new StateStoreClosedException("Store " + this.name + " is currently closed"); } } |
public V get(final K key) { Objects.requireNonNull(key); final List<ReadOnlyKeyValueStore<K, V>> stores; try { try { stores = storeProvider.stores(storeName, storeType); } catch (StateStoreClosedException e) { if (streams.state()== KafkaStreams.State.RUNNING || streams.state()== KafkaStreams.State.REBALANCING) throw new StateStoreRetryableException(e); else throw e; } for (ReadOnlyKeyValueStore<K, V> store : stores) { try { final V result = store.get(key); if (result != null) { return result; } } catch (StateStoreClosedException e) { // Before: throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); throw new StateStoreMigratedException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); } } } catch (InvalidStateStoreException e) { if (streams.state()== KafkaStreams.State.PENDING_SHUTDOWN || streams.state()== KafkaStreams.State.ERROR || streams.state()==KafkaStreams.State.NOT_RUNNING) { throw new StateStoreFailException(e); } else throw e; } return null; } |