DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Adopted
Vote thread: here
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Interactive-Query APIs are designed for usability. Instant-based methods where introduced in KIP-358 to represent time ranges.
On the other hand, read-write interfaces used by Processor API are tuned for performance. Therefore, long-based methods are kept to avoid performance penalties caused by object allocation.
ReadOnlySessionStore API is misaligned with the improvements introduced into ReadOnlyWindowStore in KIP-358.
This proposal aims to solve this issue, adding Instant-based alternative methods to the interactive query API for Session Stores.
KIP-617 is already moving read methods from SessionStore into ReadOnlySessionStore
Public Interfaces
- org.apache.kafka.streams.state.ReadOnlySessionStore
- add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method: AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime)
- add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- org.apache.kafka.streams.state.SessionStore
- add method with default implementation calling long-based method:
- KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime)
- KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method with default implementation calling long-based method:
Proposed Changes
Changes to ReadOnlySessionStore, considering new methods added on KIP-617:
public interface ReadOnlySessionStore<K, AGG> {
// Moving read functions from SessionStore to ReadOnlySessionStore
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime);
// To be handled as part of KIP-617
KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
}
public interface SessionStore<K, AGG> extends ReadOnlySessionStore<K, AGG> {
//Existing messages
KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime);
KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime);
AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime);
//Default implementations
@Override
default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return findSessions(
key,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}
@Override
default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
return fetchSession(
key,
ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
}
@Override
default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
return fetchSession(
key,
ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
}
// To be handled as part of KIP-617
KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime);
KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime);
@Override
default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return backwardFindSessions(
key,
ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
}
@Override
default AGG backwardFetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
return backwardFetchSession(
key,
ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
}
}
Compatibility, Deprecation, and Migration Plan
- Custom IQ Session Stores will have to implement new methods if haven't implement read-write APIs yet.
Rejected Alternatives
None yet.