...
Proposed Changes
There are add four the new exceptions:
Code Block | ||
---|---|---|
| ||
# Throw to user exception public class StateStoreMigratedException extends InvalidStateStoreException public class StateStoreRetryableException extends InvalidStateStoreException public class StateStoreFailException extends InvalidStateStoreException # Internal exception public class StateStoreClosedException extends InvalidStateStoreException public class StateStoreEmptyException extends InvalidStateStoreException public class StreamThreadNotRunningException extends InvalidStateStoreException |
Three categories exception throw to the user:
- StateStoreRetryableException: The application instance in the state of rebalancing, the user just need retry and wait until rebalance finished.
StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.
- StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.
Two Three internal exceptionexceptions: StateStoreClosedException, StateStoreStreamThreadNotRunningException, StateStoreEmptyException, StreamThreadNotRunningException
The internal exception will be wrapped as category exception finally.
The following is the public method that users will call:
...
- If state is RUNNING or REBALANCING:
- StateStoreClosedException: should be wrapped to to StateStoreRetryableException
- StreamThreadNotRunningException: should be wrapped to StateStoreRetryableException
- StateStoreMigratedExceptionStateStoreEmptyException: should not be wrapped , directly thrownto StateStoreMigratedException
- if state is PENDING_SHUTDOWN or ERROR or NOT_RUNNING:
- wrap InvalidStateStoreException(include sub classes) to StateStoreFailException
...
Code Block | ||||
---|---|---|---|---|
| ||||
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) { final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); if (!globalStore.isEmpty()) { return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName); } final List<T> allStores = new ArrayList<>(); for (StateStoreProvider storeProvider : storeProviders) { allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); } if (allStores.isEmpty()) { // TODO: Replace with StateStoreMigratedExceptionStateStoreEmptyException throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); } return queryableStoreType.create( new WrappingStoreProvider(storeProviders), storeName); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } if (!streamThread.isRunningAndNotRebalancing()) { // TODO: Replace with StateStoreRetryableExceptionStreamThreadNotRunningException throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + streamThread.state() + ", not RUNNING"); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask + " because the store is not open. The state store may have migrated to another instances."); } stores.add((T) store); } } return stores; } |
Code Block | ||||
---|---|---|---|---|
| ||||
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) { final List<T> allStores = new ArrayList<>(); for (StateStoreProvider provider : storeProviders) { final List<T> stores = provider.stores(storeName, type); allStores.addAll(stores); } if (allStores.isEmpty()) { // TODO: Replace with StateStoreMigratedExceptionStateStoreEmptyException throw new InvalidStateStoreExceptionStateStoreEmptyException("The state store, " + storeName + ", may have migrated to another instance."); } return allStores; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public boolean hasNext() { boolean hasNext = false; while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen()) && segments.hasNext()) { close(); currentSegment = segments.next(); try { if (from == null || to == null) { currentIterator = currentSegment.all(); } else { currentIterator = currentSegment.range(from, to); } } catch (InvalidStateStoreException e) { // TODO: Replace with StateStoreClosedException // segment may have been closed so we ignore it. } } return currentIterator != null && hasNext; } |
Compatibility, Deprecation, and Migration Plan
...