...
Code Block | ||
---|---|---|
| ||
stream.groupByKey().aggregate(initializer, aggregator, merger, SessionWindows.inactivityGap(FIVE_MINUTES) .until(ONE_HOUR), aggregateValueSerde, “session-store”); |
In order to process SessionWindows we’ll need to add a new Processor. This will be responsible for creating sessions, merging existing sessions into sessions with larger windows, and producing aggregates from a session’s values.
On each incoming record the process method will:
Find any existing adjacent sessions that fall within either start or end within the inactivity gap, i,e., sessionStore.fetch(userKey, timestamp - gap, timestamp + gap)where the end time of the session is > now - inactivity gap, or the start time is < now + inactivity gap.
Merge any existing sessions into a new larger session using the SessionMerger to merge the aggregates of the existing sessions.
Aggregate the value record being processed with the merged session.
Store the new merged session in the SessionStore.
Remove any merged sessions from the SessionStore.
Late arriving data
Late arriving data is mostly treated the same as non-late arriving data, i.e., it can create a new session or be merged into an existing one. The only difference is that if the data has arrived after the retention period, defined by SessionWindows.until(..), a new session will be created and aggregated, but it will not be persisted to the store.
SessionWindows
We propose to add a new class SessionWindows. SessionWindows will be able to be used with new overloaded operations on KGroupedStream, i.e, aggregate(…), count(..), reduce(…). A SessionWindows will have a defined gap, that represents the period of inactivity. It will also provide a method, until(...), to specify how long the data is retained for, i.e., to allow for late arriving data.
SessionStore
We propose to add a new type of StateStore, SessionStore. A SessionStore, is a segmented store, similar to a WindowStore, but the segments are indexed by session end time. We index by end time so that we can expire (remove) the Segments containing sessions where session endTime < stream-time - retention-period.
The records in the SessionStore will be stored by a SessionKey. The SessionKey is a composite of the record key, window start, and window end times. The start and end times of the SessionKey are driven by the data. If the Session only has a single value then start == end. The segment a Session is stored in is determined by SessionKey.end. Fetch requests against the SessionStore use both the SessionKey.start and Session.end to find sessions to merge.
Each Segment is for a particular interval of time. To work out which Segment a session belongs in we simply divide SessionKey.end by the segment interval. The segment interval is calculated as Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL).
SessionKey End | Segment Index |
---|---|
0 | 0 |
500 | 0 |
1000 | 1 |
2000 | 2 |
Put
As session aggregates arrive, i.e., on put, the implementation of SessionStore will:
use SessionKey.end to get or create the Segment to store the aggregate in
If the Segment is non-null, we add the aggregate to the Segment.
If the Segment was null, this signals that the record is late and has arrived after the retention period. This record is not added to the store.
Fetch
When SessionStore.fetch(...) is called we find all the aggregates for the record key where SessionKey.end >= earliestEndTime && SessionKey.start <= latestStartTime. In order to do this:
Find the Segments to search by getting all Segments starting from earliestEndTime
Define the range query as:
from = (record-key, end=earliestEndTime, start=0)
to = (record-key, end=Long.MAX_VALUE, start=Long.MAX_VALUE)
latestStartTime)
For example, if for an arbitrary record key we had the following sessions in the store:
Session Start | Session End |
---|---|
0 | 99 |
101 | 200 |
201 | 300 |
301 | 400 |
...