Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error.
Because of that, we should throw different exceptions for each type.
There are the new exceptions:
# Throw to user exception public class StateStoreMigratedException extends InvalidStateStoreException public class StateStoreRetryableException extends InvalidStateStoreException public class StateStoreFailException extends InvalidStateStoreException # Internal exception public class StateStoreClosedException extends InvalidStateStoreException public class StateStoreEmptyException extends InvalidStateStoreException public class StreamThreadNotRunningException extends InvalidStateStoreException |
Three categories exception throw to the user:
StateStoreMigratedException: The store got migrated and not hosted in application instance, the users need to rediscover the store.
Three internal exceptions: StateStoreClosedException, StateStoreEmptyException, StreamThreadNotRunningException
The internal exception will be wrapped as category exception finally.
The following is the public method that users will call:
public class KafkaStreams { public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType); } public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> { public V get(final K key); public KeyValueIterator<K, V> range(final K from, final K to); public KeyValueIterator<K, V> all(); public long approximateNumEntries(); } public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore<K, V> { public KeyValueIterator<Windowed<K>, V> fetch(final K key); public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to); } public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> { public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo); public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo); public KeyValueIterator<Windowed<K>, V> all(); public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo); } class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> { public synchronized boolean hasNext(); public synchronized KeyValue<K, V> next(); public KeyValue<K, V> peekNext(); } class MeteredWindowStoreIterator<V> implements WindowStoreIterator<V> { public boolean hasNext(); public KeyValue<Long, V> next(); public Long peekNextKey() } |
During user call one of above methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:
|
|
|
|
|
|
|
|
|
|
|
None.