...
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error.
Because of that, we should throw different exceptions for each type.
Proposed Changes
There append three are add four new exceptions:
Code Block | ||
---|---|---|
| ||
public class StateStoreClosedExceptionStateStoreMigratedException extends InvalidStateStoreException public class StateStoreMigratedExceptionStateStoreRetryableException extends InvalidStateStoreException public class StateStoreRetryableExceptionStateStoreFailException extends InvalidStateStoreException public class StateStoreFailExceptionStateStoreClosedException extends InvalidStateStoreException |
...
- StateStoreRetriableException: The application instance in the state of rebalancing, the user just need retry and wait until rebalance finished.
StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.
- StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.
One internal exception: StateStoreClosedException.
The following is the public method that users will call:
...
Expand | ||
---|---|---|
| ||
|
Expand | ||
---|---|---|
| ||
|
Expand | ||
---|---|---|
| ||
|
Expand | ||
---|---|---|
| ||
|
Expand | ||
---|---|---|
| ||
|
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) { final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType); if (!globalStore.isEmpty()) { return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName); } final List<T> allStores = new ArrayList<>(); for (StateStoreProvider storeProvider : storeProviders) { allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); } if (allStores.isEmpty()) { // TODO: Replace with StateStoreMigratedException throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } return queryableStoreType.create( new WrappingStoreProvider(storeProviders), storeName); } |
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { final StateStore store = globalStateStores.get(storeName); if (store == null || !queryableStoreType.accepts(store)) { return Collections.emptyList(); } if (!store.isOpen()) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("the state store, " + storeName + ", is not open."); } return (List<T>) Collections.singletonList(store); } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } if (!streamThread.isRunningAndNotRebalancing()) { // TODO: Replace with StateStoreMigratedException throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + streamThread.state() + ", not RUNNING"); } final List<T> stores = new ArrayList<>(); for (Task streamTask : streamThread.tasks().values()) { final StateStore store = streamTask.getStore(storeName); if (store != null && queryableStoreType.accepts(store)) { if (!store.isOpen()) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask + " because the store is not open. The state store may have migrated to another instances."); } stores.add((T) store); } } return stores; } |
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) { final List<T> allStores = new ArrayList<>(); for (StateStoreProvider provider : storeProviders) { final List<T> stores = provider.stores(storeName, type); allStores.addAll(stores); } if (allStores.isEmpty()) { // Replace with StateStoreMigratedException throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance."); } return allStores; } |
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
void validateStoreOpen() void validateStoreOpen() { if (!innerState.isOpen()) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."); } } |
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
private void validateStoreOpen() { if (!open) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); } } |
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
public synchronized boolean hasNext() { if (!open) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName)); } return iter.isValid(); } |
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
public synchronized boolean hasNext() { if (!open) { // TODO: Replace with StateStoreClosedException throw new InvalidStateStoreException(String.format("Store %s has closed", storeName)); } if (next != null) { return true; } if (!underlying.hasNext()) { return false; } next = underlying.next(); return true; } |
Code Block | ||||
---|---|---|---|---|
| ||||
List<Segment> segments(final long timeFrom, final long timeTo) {
final long segFrom = Math.max(minSegmentId, segmentId(Math.max(0L, timeFrom)));
final long segTo = Math.min(maxSegmentId, segmentId(Math.min(maxSegmentId * segmentInterval, Math.max(0, timeTo))));
final List<Segment> segments = new ArrayList<>();
for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
Segment segment = getSegment(segmentId);
if (segment != null && segment.isOpen()) {
try {
segments.add(segment);
} catch (InvalidStateStoreException ise) { // TODO: Replace with StateStoreClosedException
// segment may have been closed by streams thread;
}
}
}
return segments;
}
List<Segment> allSegments() {
final List<Segment> segments = new ArrayList<>();
for (Segment segment : this.segments.values()) {
if (segment.isOpen()) {
try {
segments.add(segment);
} catch (InvalidStateStoreException ise) { // TODO: Replace with StateStoreClosedException
// segment may have been closed by streams thread;
}
}
}
Collections.sort(segments);
return segments;
} |
Code Block | ||||
---|---|---|---|---|
| ||||
public boolean hasNext() {
boolean hasNext = false;
while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
&& segments.hasNext()) {
close();
currentSegment = segments.next();
try {
if (from == null || to == null) {
currentIterator = currentSegment.all();
} else {
currentIterator = currentSegment.range(from, to);
}
} catch (InvalidStateStoreException e) { // TODO: Replace with StateStoreClosedException
// segment may have been closed so we ignore it.
}
}
return currentIterator != null && hasNext;
}
|
Compatibility, Deprecation, and Migration Plan
- All new exceptions extend from InvalidStateStoreException, this change will be fully backward compatible.
Rejected Alternatives
None.
...