Status
Current state: Under Discussion
...
Motivation
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error.
Because of that, we should throw different exceptions for each type.
Proposed Changes
The To distinguish different types of error, we need to handle all InvalidStateStoreExceptions better during these public methods invoked. The main change is to introduce new exceptions that extend from InvalidStateStoreException. InvalidStateStoreException is not thrown at all anymore, but only new sub-classes.
Code Block |
---|
|
# ThrowTwo tocategory user exceptionexceptions
public class StateStoreMigratedExceptionRetryableStateStoreException extends InvalidStateStoreException
public class StateStoreRetryableExceptionFatalStateStoreException extends InvalidStateStoreException
# Retryable exceptions
public class StateStoreFailExceptionStreamThreadNotStartedException extends InvalidStateStoreException
# Internal exception
RetryableStateStoreException
public class StateStoreClosedExceptionStreamThreadRebalancingException extends InvalidStateStoreExceptionRetryableStateStoreException
public class StateStoreEmptyExceptionStateStoreMigratedException extends InvalidStateStoreException RetryableStateStoreException
# Fatal exceptions
public class StreamThreadNotRunningException extends InvalidStateStoreException |
Three categories exception throw to the user:
Various state store exceptions can classify into two category exceptions: RetryableStateStoreException and FatalStateStoreException. The user can use the two exceptions if they only need to distinguish whether it can retry.
- Retryable exceptions
- StreamThreadNotStartedException: will be thrown when streams thread state is CREATED, the user can retry until to RUNNING.
- StreamThreadRebalancingException: will be thrown when stream thread is not running and stream state is REBALANCING, the user just
StateStoreRetryableException: The application instance in the state of rebalancing, the user just need - retry and wait until rebalance finished (RUNNING).
- StateStoreMigratedException:
The store got migrated and not hosted in application instance, the users need to rediscover the store. - StateStoreFailException: Fatal error when access state store, the user cannot retry or rediscover.
Three internal exceptions: StateStoreClosedException, StateStoreEmptyException, StreamThreadNotRunningException
The internal exception will be wrapped as user's category exception finally. The purpose of the internal exceptions is to distinguish between the different kinds of InvalidStateStoreException. For example, StreamThreadStateStoreProvider#stores() will throw StreamThreadNotRunningException(internal exception). And then the internal exception will be wrapped as StateStoreRetryableException or StateStoreFailException during the KafkaStreams.store() and throw to the user.
- will be thrown when state store already closed and stream state is REBALANCING.
- Fatal exceptions
- KafkaStreamsNotRunningException: will be thrown when stream thread is not running and stream state is PENDING_SHUTDOWN / NOT_RUNNING / ERROR. The user cannot retry when this exception is thrown.
- StateStoreNotAvailableException: will be thrown when state store closed and stream thread is PENDING_SHUTDOWN / NOT_RUNNING / ERROR. The user cannot retry when this exception is thrown.
- UnknownStateStoreException: will be thrown when passing an unknown state store.
The following is the public methods that users will call to get state store instance:
- KafkaStreams
- store(storeName, queryableStoreType)
Info |
---|
Throw exceptions: StreamThreadNotStartedException, StreamThreadRebalancingException, KafkaStreamsNotRunningException, UnknownStateStoreException |
...
The following is the public method methods that users will call to get store values:
- KafkaStreams
- interface ReadOnlyKeyValueStore(class CompositeReadOnlyKeyValueStore)
- get(kkey)
- range(from, to)
- all()
- approximateNumEntries()
- ReadOnlySessionStoreinterface ReadOnlySessionStore(CompositeReadOnlySessionStoreclass CompositeReadOnlySessionStore)
- fetch(kkey)
- fetch(from, to)
ReadOnlyWindowStore(CompositeReadOnlyWindowStore - interface ReadOnlyWindowStore(class CompositeReadOnlyWindowStore)
- fetch(key, time)
fetch(
kkey,
rffrom,
ttto)
- fetch(from, to, rffromTime, tttoTime)
- all()
- fetchAll()
- KeyValueIterator(DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
- WindowStoreIterator(MeteredWindowStoreIterator)
- next()
- hasNext()
- peekNextKey()
...
...
...
...
- to)
- @Deprecated fetch(key, timeFrom, timeTo)
- @Deprecated fetch(from, to, timeFrom, timeTo)
- @Deprecated fetchAll(timeFrom, timeTo)
- interface KeyValueIterator(class DelegatingPeekingKeyValueIterator)
- next()
- hasNext()
- peekNextKey()
Info |
---|
All the above methods could be throw following exceptions: StreamThreadRebalancingException, StateStoreMigratedException, KafkaStreamsNotRunningException, StateStoreNotAvailableException |
During user call one of above methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:
- If state is RUNNING or REBALANCING:
- StateStoreClosedException: should be wrapped to StateStoreRetryableException
- StreamThreadNotRunningException: should be wrapped to StateStoreRetryableException
- StateStoreEmptyException: should be wrapped to StateStoreMigratedException
- if state is PENDING_SHUTDOWN or ERROR or NOT_RUNNING:
- wrap InvalidStateStoreException(include sub classes) to StateStoreFailException
Call Trace
Expand |
---|
title | Call trace 1: KafkaStreams#store() |
---|
|
- KafkaStreams#store() (v)
- QueryableStoreProvider#getStore() (v)
- GlobalStateStoreProvider#stores() (v)
- StreamThreadStateStroeProvider#stores() (v)
|
Expand |
---|
title | Call trace 2: ReadOnlyKeyValueStore#get() |
---|
|
- CompositeReadOnlyKeyValueStore#get() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#get()
- InnerMeteredKeyValueStore#get()
- CachingKeyValueStore#get()
- AbstractStateStore#validateStoreOpen() (v)
- CachingKeyValueStore#getInternal()
- ChangeLoggingKeyValueBytesStore#get()
- RocksDBStore#get()
- RocksDBStore#validateStoreOpen() (v)
- RocksDBStore#getInternal()
|
Expand |
---|
title | Call trace 3: ReadOnlyKeyValueStore#range() |
---|
|
- CompositeReadOnlyKeyValueStore#range() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#range()
- InnerMeteredKeyValueStore#range()
- CachingKeyValueStore#range()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#range()
- RocksDBStore#range()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- RocksDBRangeIterator#hasNext()
RocksDBIterator#hasNext() (v) - AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
- RocksDBRangeIterator#next()
RocksDBIterator#next()- RocksDbIterator#hasNext() (v)
- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return keyvalue entry
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 4: ReadOnlyKeyValueStore#all() |
---|
|
- CompositeReadOnlyKeyValueStore#all() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply() (v)
- MeteredKeyValueBytesStore#all()
- InnerMeteredKeyValueStore#all()
CachingKeyValueStore#all()- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingKeyValueBytesStore#all()
- RocksDBStore#all()
- RocksDBStore#validateStoreOpen() (v)
- return new RocksDBIterator()
- return
- return new MergedSortedCacheKeyValueBytesStoreIterator()
- return new MeteredKeyValueIterator()
- return
- return
- return
- CompositKeyValueIterator#next()
- MeteredKeyValueIterator#next()
- MergedSortedCacheKeyValueBytesStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- MemoryLRUCacheBytesIterator.hasNext()
- DelegatingPeekingKeyValueIterator.hasNext() (v)
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- return
- return
- return outerkeyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 5: ReadOnlyKeyValueStore#approximateNumEntries() |
---|
|
- CompositeReadOnlyKeyValueStore#approximateNumEntries() (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredKeyValueBytesStore#approximateNumEntries()
- InnerMeteredKeyValueStore#approximateNumEntries()
- CachingKeyValueStore#approximateNumEntries()
- AbstractStateStore#validateStoreOpen() (v)
- RocksDBStore#approximateNumEntries()
- RocksDBStore#validateStoreOpen() (v)
- return value
- return
- return
- return
- return total
|
...
title | Call trace 6: ReadOnlySessionStore#fetch(key) |
---|
- CompositeReadOnlySessionStore#fetch(key) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredSessionStore#fetch(key)
- MeteredSessionStore#findSessions()
- CachingSessionStore#findSessions()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingSessionBytesStore#findSessions()
- RocksDBSessionStore.findSessions(k)
- RocksDBSessionStore.findSessions(from, to)
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments() (v)
- Segments#getSegment()
- ConcurrentHashMap#get()
- Segments#isSegment()
- return segment
- retiurn segments
- return
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstraceMergedSortedCacheStoreIterator#hasNext()- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return iterator(MeteredWindowedKeyValueIterator)
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheSessionStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- MergedSortedCacheSessionStoreIterator#nextStoreValue()
AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedSessionStoreIterator#next()
- SegmentIterator#next()
- RocksDBRangeIterator#next()
RocksDbIterator#next()- RocksDbIterator#getKeyValue()
- RocksIterator#next()
- return entry
- return
- return
- return
- return
- return
...
Expand |
---|
title | Call trace 7: ReadOnlySessionStore#fetch(from, to) |
---|
|
- CompositeReadOnlySessionStore#fetch(from, to) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return new DelegatingPeekingKeyValueIterator<>()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositKeyValueIterator#hasNext()
- NextIteratorFunction#apply(store)
- MeteredSessionStore#fetch(from, to)
- MeteredSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#findSession(from, to, 0, t)
- CachingSessionStore#validStoreOpen()
AbstractStateStore#validStoreOpen() (v)
- ChangeLoggingSessionBytesStore#isOpen()
- RocksDBSessionStore#isOpen()
AbstractStateStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
- ChangeLoggingSessionBytesStore#findSesisons()
- RocksDBSessionStore#findSessions()
- RocksDBSegmentedBytesStore#fetch()
- SessionKeySchema#segmentsToSearch()
- Segments#segments() (v)
- return new SegmentIterator()
- return new WrappedSessionStoreIterator()
- return
- return
- return new MergedSortedCacheSessionStoreIterator()
- return MeteredWindowedKeyValueIterator()
- return
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheSessionStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- FilteredCacheIterator#hasNext()
- WrappedSessionStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment.range(from, to)
RocksDBStore.range(from, to)
- RocksDBStore.validateStoreOpen() (v)
- return new RocksDBRangeIterator()
- return
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- return
- DelegatingPeekingKeyValueIterator#next()
|
Expand |
---|
title | Call trace 8: ReadOnlyWindowStore#fetch(key) |
---|
|
- CompositeReadOnlySessionStore#fetch(key) (v)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- MeteredWindowStore#fetch()
- CachingWindowStore#fetch(k, tf, tt)
- AbstractStateStore#validateStoreOpen()
- ChangeLoggingWindowBytesStore#isOpen()
- AbstractStateStore#isOpen()
- RocksDBWindowBytesStore#isOpen()
- RocksDBSegmentedBytesStore#isOpen()
- return
- return
- return
- return
- ChangeLoggingWindowBytesStore#fetch()
- RocksDBWindowStoreBytesStore#fetch()
- RocksDBSegmentedBytesStore#fetch()
- WindowKeySchema#segmentsToSearch()
- Segments#segments()
- RocksDBStore#isOpen()
- return segments
- return
- return new SegmentIterator()
- WindowStoreIteratorWrapper.bytesIterator()
- return new WrappedWindowStoreBytesIterator()
- return
- return
- ThreadCache#range()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreIterator
- return new MeteredWindowStoreIterator
- MeteredWindowStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()- WrappedWindowStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen()
- return RocksDBRangeIterator
- return
- return
- return
- return
- MeteredWindowStoreIterator#hasNext()
- MeteredSortedCacheWindowStoreIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()- WrappedWindowStoreIterator#hasNext()
- SegmentIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen() (v)
- return RocksDBRangeIterator
- return
- return
- return
- return
- MeteredWindowStoreIterator#next()
- MergedSortedCacheWindowStoreIterator#next()
AbstractMergedSortedCacheStoreIterator#next()- WrappedWindowStoreBytesIterator#hasNext()
- SegmentIterator#hasNext()
- return
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedWindowStoreBytesIterator#next()
- SegmentIterator#next()
- SegmentIterator#hasNext()
- RocksDBRangeIterator#next()
- RocksDBRangeIterator#hasNext()
- RocksDBIterator#hasNext() (v)
- return
- RocksIterator#next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
|
Expand |
---|
title | Call trace 9: ReadOnlyWindowStore#fetch(from, to) |
---|
|
- CompositeReadOnlyWindowStore#fetch(from, to)
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return
- return new DelegatingPeekingKeyValueIterator()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositeKeyValueIterator#hasNext()
- NextIteratorFunciton#apply()
- MeteredWindowStore#fetch()
- CachingWindowStore#fetch()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingWindowBytesStore#fetch()
- RocksDBWindowBytesStore#fetch()
- RocksDBSegmentedBytesStore#fetch()
- WindowKeySchema#segmentsToSearch()
- Segments#segments() (v)
- return
- return SegmentIterator
- return keyvalueIterator
- return
- ThreadCache.range()
- return new MemoryLRUCacheBytesIterator
- WindowKeySchema#hasNextCondition()
- return new MergedSortedCacheWindowStoreKeyValueIterator()
- return MeteredWindowKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
AbstractMergedSortedCacheStoreIterator
- WrappedWindowStoreBytesIterator#hasNext()
WrappedKeyValueIterator#hasNext()
- SegementIterator#hasNext()
- Segment#range()
RocksDBStore#range()- RocksDBStore#validateStoreOpen() (v)
- return RocksDBRangeIterator
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheWindowStoreKeyValueIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- AbstractMergedSortedCacheStoreIterator#hasNext()
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedWindowStoreBytesIterator#next()
- SegmentIteator#hasNext()
- SegmentIterator#next()
- SegmentIterator#hasNext() (v)
- RocksDBRangeIterator#next()
RocksDbIterator#next()- RocksDbIterator#hasNext()
- RocksIterator#next()
- return entry
- return
- return keyvalue
- return
- return keyvalue
- return
- return
- return
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext()
- return
|
Expand |
---|
title | Call trace 10: ReadOnlyWindowStore#all() |
---|
|
- CompositeReadOnlyWindowStore#all()
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return
- return new DelegatingPeekingKeyValueIterator()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositeKeyValueIterator#hasNext()
- NextIteratorFunciton#apply()
- MeteredWindowStore#all()
- CachingWindowStore#all()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingWindowBytesStore#all()
- RocksDBWindowBytesStore#all()
- RocksDBSegmentedBytesStore#all()
- Segments#allSegments() (v)
- return new SegmentIterator()
- return keyValueIterator
- return
- ThreadCache#all()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreKeyValueIterator
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheWindowStoreKeyValueIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext()
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedKeyValueIterator#next()
- SegmentIterator#next()
- RocksDbIterator#next()
- RocksDbIterator#hasNext() (v)
- RocksIterator.next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext()
- return
|
Expand |
---|
title | Call trace 11: ReadOnlyWindowStore#fetchAll() |
---|
|
- CompositeReadOnlyWindowStore#fetchAll()
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return
- return new DelegatingPeekingKeyValueIterator()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositeKeyValueIterator#hasNext()
- NextIteratorFunciton#apply()
- MeteredWindowStore#fetchAll()
- CachingWindowStore#fetchAll()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingWindowBytesStore#fetchAll()
- RocksDBWindowBytesStore#fetchAll()
- RocksDBSegmentedBytesStore#fetchAll()
- Segments#allSegments() (v)
- return new SegmentIterator()
- return keyValueIterator
- return
- ThreadCache#all()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreKeyValueIterator
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheWindowStoreKeyValueIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedKeyValueIterator#next()
- SegmentIterator#next()
- RocksDbIterator#next()
- RocksDbIterator#hasNext() (v)
- RocksIterator.next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext()
- return
|
Compatibility, Deprecation, and Migration Plan
...
Rejected Alternatives
None.
Text-to-speech function is limited to 200 characters
|