...
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.
Proposed Changes
Three categories exception to user
- StateStoreRetryableException: just wait, plain retry
- StateStoreMigratedException: need rediscover
- StateStoreFailException: fatal error, cannot retry or rediscover
There append three new exceptions:
Code Block | ||
---|---|---|
| ||
// public class StateStoreClosedException extends InvalidStateStoreException public class StateStoreMigratedException extends InvalidStateStoreException public class StateStoreRetryableException extends InvalidStateStoreException public class StateStoreFailException extends InvalidStateStoreExceptionStateStoreRetryableException extends InvalidStateStoreException public class StateStoreFailException extends InvalidStateStoreException |
Three categories exception throw to user
- StateStoreRetriableException: The application instance in the state of rebalancing, the user just need retry and wait until rebalance finished.
StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.
- StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.
The following is the public method that users will call:
- KafkaStreams
- stores()
- ReadOnlyKeyValueStore(CompositeReadOnlyKeyValueStore)
- get(k)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStore(CompositeReadOnlySessionStore)
- fetch(k)
- fetch(from, to)
- ReadOnlyWindowStore(CompositeReadOnlyWindowStore)
- fetch(k, rf, tt)
- fetch(from, to, rf, tt)
- all()
- fetchAll()
- KeyValueIterator(DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
We need check KafkaStreams status by following rule when any state store exception thrown during user call all of above methods
- If state is RUNNING or REBALANCING
- wrap InvalidStateStoreException to StateStoreRetriableException
- if state is PENDING_SHUTDOW or ERROR or NOT_RUNNING
- wrap InvalidStateStoreException to StateStoreFailException
Call Trace
Expand | ||
---|---|---|
| ||
|
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } if (!streamThread.isRunningAndNotRebalancing()) { throw new InvalidStateStoreException("theCannot get state store, " + storeName + ", may have migrated to another instance. " because the stream thread is " + streamThread.state() + ", not RUNNING"); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { throw new InvalidStateStoreException("theCannot get state store, " + storeName + ", " + storeName + " for task " + streamTask + " because the store is not open. The state store may have migrated to another instanceinstances."); } stores.add((T) store); } } return stores; } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public synchronized boolean hasNext() { if (!open) { throw new InvalidStateStoreException(String.format("Store %s has closed", storeName)); } if (next != null) { return true; } if (!underlying.hasNext()) { return false; } next = underlying.next(); return true; } |