Status
Current state: Under Discussion
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, IQ throws InvalidStateStoreException for any types of error, that means a user cannot handle different types of error.
Because of that, we should throw different exceptions for each type.
Proposed Changes
There are the new exceptions:
...
- If state is RUNNING or REBALANCING:
- StateStoreClosedException: should be wrapped to StateStoreRetryableException
- StreamThreadNotRunningException: should be wrapped to StateStoreRetryableException
- StateStoreEmptyException: should be wrapped to StateStoreMigratedException
- if state is PENDING_SHUTDOWN or ERROR or NOT_RUNNING:
- wrap InvalidStateStoreException(include sub classes) to StateStoreFailException
Call Trace
Expand |
---|
title | Call trace 1: KafkaStreams#store() |
---|
|
- KafkaStreams#store() (v)
- QueryableStoreProvider#getStore() (v)
- GlobalStateStoreProvider#stores() (v)
- StreamThreadStateStroeProvider#stores() (v)
|
...
Expand |
---|
title | Call trace 11: ReadOnlyWindowStore#fetchAll() |
---|
|
- CompositeReadOnlyWindowStore#fetchAll()
- WrappingStoreProvider#stores() (v)
- StreamThreadStateStoreProvider#stores() (v)
- return
- return new DelegatingPeekingKeyValueIterator()
- DelegatingPeekingKeyValueIterator#hasNext() (v)
- CompositeKeyValueIterator#hasNext()
- NextIteratorFunciton#apply()
- MeteredWindowStore#fetchAll()
- CachingWindowStore#fetchAll()
- AbstractStateStore#validateStoreOpen() (v)
- ChangeLoggingWindowBytesStore#fetchAll()
- RocksDBWindowBytesStore#fetchAll()
- RocksDBSegmentedBytesStore#fetchAll()
- Segments#allSegments() (v)
- return new SegmentIterator()
- return keyValueIterator
- return
- ThreadCache#all()
- return new MemoryLRUCacheBytesIterator
- return new MergedSortedCacheWindowStoreKeyValueIterator
- return new MeteredWindowedKeyValueIterator()
- return
- MeteredWindowedKeyValueIterator#hasNext()
- MergedSortedCacheWindowStoreKeyValueIterator#hasNext()
AbstractMergedSortedCacheStoreIterator#hasNext()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- return
- return
- return
- CompositeKeyValueIterator#next()
- MeteredWindowedKeyValueIterator#next()
- MergedSortedCacheWindowStoreKeyValueIterator#next()
AbstractMergedSortedCacheStoreIterator#next()
- WrappedKeyValueIterator#hasNext()
- SegmentIterator#hasNext() (v)
- return
- AbstractMergedSortedCacheStoreIterator#nextStoreValue()
- WrappedKeyValueIterator#next()
- SegmentIterator#next()
- RocksDbIterator#next()
- RocksDbIterator#hasNext() (v)
- RocksIterator.next()
- return entry
- return
- return keyvalue
- return
- return
- return keyvalue
- return
- return
- DelegatingPeekingKeyValueIterator#next()
- DelegatingPeekingKeyValueIterator#hasNext()
- return
|
...
Code Block |
---|
|
public interface QueryableStoreType<T> {
// TODO: pass stream instance parameter
T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName);
} |
Code Block |
---|
language | java |
---|
title | class QueryableStoreProvider |
---|
|
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
}
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
}
if (allStores.isEmpty()) {
// TODO: Replace with StateStoreEmptyException
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders),
storeName);
} |
Code Block |
---|
language | java |
---|
title | class GlobalStateStoreProvider |
---|
|
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final StateStore store = globalStateStores.get(storeName);
if (store == null || !queryableStoreType.accepts(store)) {
return Collections.emptyList();
}
if (!store.isOpen()) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
}
return (List<T>) Collections.singletonList(store);
} |
Code Block |
---|
language | java |
---|
title | class StreamThreadStateStoreProvider |
---|
|
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
// TODO: Replace with StreamThreadNotRunningException
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
streamThread.state() + ", not RUNNING");
}
final List<T> stores = new ArrayList<>();
for (Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
" because the store is not open. The state store may have migrated to another instances.");
}
stores.add((T) store);
}
}
return stores;
} |
Code Block |
---|
language | java |
---|
title | class WrappingStoreProvider |
---|
|
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider provider : storeProviders) {
final List<T> stores =
provider.stores(storeName, type);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
// TODO: Replace with StateStoreEmptyException
throw new StateStoreEmptyException("The state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
} |
Code Block |
---|
language | java |
---|
title | class AbstractStateStore |
---|
|
void validateStoreOpen() {
if (!innerState.isOpen()) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
}
} |
Code Block |
---|
language | java |
---|
title | class RocksDBStore |
---|
|
private void validateStoreOpen() {
if (!open) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
}
} |
Code Block |
---|
language | java |
---|
title | class RocksDBIterator |
---|
|
public synchronized boolean hasNext() {
if (!open) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
}
return iter.isValid();
} |
Code Block |
---|
language | java |
---|
title | class DelegatingPeekingKeyValueIterator |
---|
|
public synchronized boolean hasNext() {
if (!open) {
// TODO: Replace with StateStoreClosedException
throw new InvalidStateStoreException(String.format("Store %s has closed", storeName));
}
if (next != null) {
return true;
}
if (!underlying.hasNext()) {
return false;
}
next = underlying.next();
return true;
} |
Code Block |
---|
language | java |
---|
title | class Segments |
---|
|
List<Segment> segments(final long timeFrom, final long timeTo) {
final long segFrom = Math.max(minSegmentId, segmentId(Math.max(0L, timeFrom)));
final long segTo = Math.min(maxSegmentId, segmentId(Math.min(maxSegmentId * segmentInterval, Math.max(0, timeTo))));
final List<Segment> segments = new ArrayList<>();
for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
Segment segment = getSegment(segmentId);
if (segment != null && segment.isOpen()) {
try {
segments.add(segment);
} catch (InvalidStateStoreException ise) { // TODO: Replace with StateStoreClosedException
// segment may have been closed by streams thread;
}
}
}
return segments;
}
List<Segment> allSegments() {
final List<Segment> segments = new ArrayList<>();
for (Segment segment : this.segments.values()) {
if (segment.isOpen()) {
try {
segments.add(segment);
} catch (InvalidStateStoreException ise) { // TODO: Replace with StateStoreClosedException
// segment may have been closed by streams thread;
}
}
}
Collections.sort(segments);
return segments;
} |
Code Block |
---|
language | java |
---|
title | class SegmentIterator |
---|
|
public boolean hasNext() {
boolean hasNext = false;
while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
&& segments.hasNext()) {
close();
currentSegment = segments.next();
try {
if (from == null || to == null) {
currentIterator = currentSegment.all();
} else {
currentIterator = currentSegment.range(from, to);
}
} catch (InvalidStateStoreException e) { // TODO: Replace with StateStoreClosedException
// segment may have been closed so we ignore it.
}
}
return currentIterator != null && hasNext;
} |
Compatibility, Deprecation, and Migration Plan
- All new exceptions extend from InvalidStateStoreException, this change will be fully backward compatible.
Rejected Alternatives
None.
Text-to-speech function is limited to 200 characters