You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

 

Status

Current stateUnder Discussion

Discussion thread: here

JIRA KAFKA-5876 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.

 

Proposed Changes

Three categories exception to user

  • StateStoreRetryableException: just wait, plain retry
  • StateStoreMigratedException: need rediscover
  • StateStoreFailException: fatal error, cannot retry or rediscover

 

public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException

 

 

Call Trace: KafkaStreams#store()

KafkaStreams#store()
==> QueryableStoreProvider#getStore()
==> GlobalStateStore#stores()
==> StreamThreadStateStoreProvider#stores()
KafkaStreams#store()
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    try {
        return queryableStoreProvider.getStore(storeName, queryableStoreType);
    } catch (InvalidStateStoreException e) {
        if (state==State.RUNNING || state==State.REBALANCING) {
            if (e instanceof StateStoreClosedException)
                throw new StateStoreRetryableException(e);
            else // e instanceof StateStoreMigratedException
                throw e;
        } else {
            // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING
            throw new StateStoreFailException(e);
        }
    }
}
GlobalStoreProvider#stores()
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final StateStore store = globalStateStores.get(storeName);
    if (store == null || !queryableStoreType.accepts(store)) {
        return Collections.emptyList();
    }
    if (!store.isOpen()) {
        // Before:
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
        throw new StateStoreClosedException("the state store, " + storeName + ", is not open.");
    }
    return (List<T>) Collections.singletonList(store);
}
StreamThreadStateStoreProvider#stores()
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        // Before: 
        //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                // Before:
                //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
                throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}

 

Call Trace: CompositeReadOnlyKeyValueStore#get()

 

 

 

 

 


  • No labels