...
SessionWindows
Code Block |
---|
public class SessionWindows { /** * Create a new SessionWindows with the specified inactivityGap * * @param inactivityGap * @return */ public static SessionWindows inactivityGap(final long inactivityGap) /** * Set the window maintain duration in milliseconds of streams time. * This retention time is a guaranteed <i>lower bound</i> for how long * a window will be maintained. * * @return itself */ public SessionWindows until(long durationMs) } |
SessionMerger
Code Block | ||
---|---|---|
| ||
/** * The interface for merging aggregate values for {@link SessionWindows} with the given key * * @param <K> key type * @param <T> aggregate value type */ public interface SessionMerger<K, T> { /** * Compute a new aggregate from the key and two aggregates * * @param aggKey the key of the record * @param aggOne the first aggregate * @param aggTwo the second aggregate * @return the new aggregate value */ T apply(K aggKey, T aggOne, T aggTwo); } |
SessionStore
Code Block | ||
---|---|---|
| ||
/** * Interface for storing the aggregated values of sessions * @param <K> type of the record keys * @param <AGG> type of the aggregated values */ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K, AGG> { /** * Find any aggregated session values with the matching key and where the * session’s end time is >= earliestSessionEndTime, i.e, the oldest session to * merge with, and the session’s start time is <= latestSessionStartTime, i.e, * the newest session to merge with. */ KeyValueIterator<SessionKey<K>, AGG> findSessionsToMerge(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); /** * Remove the aggregated value for the session with the matching SessionKey */ void remove(final SessionKey<K> sessionKey); /** * Write the aggregated result for the given SessionKey */ void put(final SessionKey<K> key, AGG result); } |
ReadOnlySessionStore
This is primarily provided for InteractiveQueries
Code Block |
---|
/** * A session store that only supports read operations. * Implementations should be thread-safe as concurrent reads and writes * are expected. * * @param <K> the key type * @param <V> the value type */ @InterfaceStability.Unstable public interface ReadOnlySessionStore<K, AGG> { /** * Retrieve all aggregated sessions for the provided key * @param key record key to find aggregated session values for * @return KeyValueIterator containing all sessions for the provided key. */ KeyValueIterator<SessionKey<K>, AGG> fetch(final K key); } |
SessionKey
Code Block | ||
---|---|---|
| ||
/** * Represents the key for a Session Window */ public class SessionKey<K> { // record key private final K key; // session start time private final long start; // session end time private final long end; } |
QueryableStoreTypes
Additional Method
Code Block | ||
---|---|---|
| ||
/** * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore} * @param <K> key type of the store * @param <V> value type of the store * @return {@link SessionStoreType} */ public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() |
KGroupedStream
Additional methods
Code Block | ||
---|---|---|
| ||
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, final String storeName); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<K, V, T> aggregator, final SessionMerger<K, T> sessionMerger, final SessionWindows sessionWindows, KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindowsSerde<T> sessionWindowsaggValueSerde, final String storeName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<K, V, T> aggregator, final SessionMerger<K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StringStateStoreSupplier<SessionStore> storeNamestoreSupplier); KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName); KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); |
Compatibility, Deprecation, and Migration Plan
...