Status

Current stateUnder Discussion

Discussion thread: here

JIRA

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error. Because of that, we should throw different exceptions for each type.

 

Proposed Changes

Three categories exception to user

 

public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException

 

Call Trace

  • KafkaStreams#store() (v)
    • QueryableStoreProvider#getStore() (v)
      • GlobalStateStore#stores() (v)
      • StreamThreadStateStroeProvider#stores() (v)
  • CompositeReadOnlyKeyValueStore#get() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • MeteredKeyValueBytesStore#get()
      • InnerMeteredKeyValueStore#get()
        • CachingKeyValueStore#get()
          • AbstractStateStore#validateStoreOpen() (v)
            • RocksDBStore#isOpen()
        • CachingKeyValueStore#getInternal()
          • ChangeLoggingKeyValueBytesStore#get()
            • RocksDBStore#get()
              • RocksDBStore#validateStoreOpen() (v)
            • RocksDBStore#getInternal()
  • CompositeReadOnlyKeyValueStore#range() (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext()
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply()
        • MeteredKeyValueBytesStore#range()
          • InnerMeteredKeyValueStore#range()
            • CachingKeyValueStore#range()
              • AbstractStateStore#validateStoreOpen() (v)
              • ChangeLoggingKeyValueBytesStore#range()
                • RocksDBStore#range()
                  • RocksDBStore#validateStoreOpen() (v)
                  • return new RocksDBRangeIterator()
              • return new MergedSortedCacheKeyValueBytesStoreIterator()
            • return new MeteredKeyValueIterator()
          • return
        • return
    • CompositKeyValueIterator#next()
      • MeteredKeyValueIterator#next()
        • MergedSortedCacheKeyValueBytesStoreIterator#next()
          AbstractMergedSortedCacheStoreIterator#next()
          • RocksDBRangeIterator#hasNext()
            RocksDBIterator#hasNext() (v)
              • RocksIterator#isValid()
          • AbstractMergedStortedCacheStoreIterator#nextSrtoreValue()
            • RocksDBRangeIterator#next()
              RocksDBIterator#next()
              • RocksDBIterator#hasNext() (v)
              • RocksDBIterator#getKeyValue()
              • RocksIterator#next()
              • return keyvalue entry
            • return
          • return
        • return outerkeyvalue
      • return
    • return true
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue