This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-216: IQ should throw different exceptions for different errors

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

There are add four the new exceptions:

Code Block
languagejava
# Throw to user exception
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException

 
# Internal exception
public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreEmptyException extends InvalidStateStoreException
public class StreamThreadNotRunningException extends InvalidStateStoreException

Three categories exception throw to the user:

  • StateStoreRetryableException: The application instance in the state of rebalancing, the user just need retry and wait until rebalance finished.
  • StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.

  • StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.

Two Three internal exceptionexceptions: StateStoreClosedException,  StateStoreStreamThreadNotRunningException StateStoreEmptyException, StreamThreadNotRunningException
The internal exception will be wrapped as category exception finally.

The following is the public method that users will call:

...

  • If state is RUNNING or REBALANCING:
    • StateStoreClosedException: should be wrapped  to to StateStoreRetryableException
    • StateStoreMigratedExceptionStreamThreadNotRunningException: should not be wrapped , directly thrownto StateStoreRetryableException
    • StateStoreEmptyException: should be wrapped to StateStoreMigratedException
  • if state is PENDING_SHUTDOWN or ERROR or NOT_RUNNING:
    • wrap InvalidStateStoreException(include sub classes) to StateStoreFailException

...

Code Block
languagejava
titleclass QueryableStoreProvider
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
    if (!globalStore.isEmpty()) {
        return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
    }
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider storeProvider : storeProviders) {
        allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
    }
    if (allStores.isEmpty()) {
        // TODO: Replace with StateStoreMigratedExceptionStateStoreEmptyException
        throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
    }
    return queryableStoreType.create(
            new WrappingStoreProvider(storeProviders),
            storeName);
}

...

Code Block
languagejava
titleclass StreamThreadStateStoreProvider
 public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        // TODO: Replace with StateStoreRetryableExceptionStreamThreadNotRunningException
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
                streamThread.state() + ", not RUNNING");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                // TODO: Replace with StateStoreClosedException
                throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
                        " because the store is not open. The state store may have migrated to another instances.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}
Code Block
languagejava
titleclass WrappingStoreProvider
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider provider : storeProviders) {
        final List<T> stores =
            provider.stores(storeName, type);
        allStores.addAll(stores);
    }
    if (allStores.isEmpty()) {
        // TODO: Replace with StateStoreMigratedExceptionStateStoreEmptyException
        throw new InvalidStateStoreExceptionStateStoreEmptyException("The state store, " + storeName + ", may have migrated to another instance.");
    }
    return allStores;
}

...

Code Block
languagejava
titleclass SegmentIterator
public boolean hasNext() {
    boolean hasNext = false;
    while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
            && segments.hasNext()) {
        close();
        currentSegment = segments.next();
        try {
            if (from == null || to == null) {
                currentIterator = currentSegment.all();
            } else {
                currentIterator = currentSegment.range(from, to);
            }
        } catch (InvalidStateStoreException e) {  // TODO: Replace with StateStoreClosedException
            // segment may have been closed so we ignore it.
        }
    }
    return currentIterator != null && hasNext;
}

 

 

 

Compatibility, Deprecation, and Migration Plan

...