When a new record comes in, there are two new windows created and an unknown number of existing windows updated. For in order records, one new window will contain the record while the other will not contain the record, shown in Figure 1. With in order records, in this example before c comes in, the second (right) window will be empty while the first (left) window will need to contain records that have already been processed. Any already existing windows that falls into need to be updated with b's value in their aggregation

Figure 1

Screen Shot 2020-07-15 at 10.18.18 AM.png

Aggregating for New Windows

A new record for SlidingWindows will always create two new windows. If either of those windows already exist in the windows store, their aggregation will simply be updated to include the new record, but no duplicate window will be added to the WindowStore. For in-order records, the left window will always be empty. It will be stored in the window store with the its aggregate initialized to the user defined initializer, but its output will not be emitted until it contains a value. We store empty windows in order to make future lookups for overlapping windows simpler.

The left window of in-order records and both windows for out-of-order records need to be updated with the values of records that have already been processed. Because each record creates one new window that includes itself and one window that does not, we have the set of all possible windows stored in our WindowStore. Therefore, when we need to find an aggregation to create a new window, that aggregation is already stored somewhere in the WindowStore. Figure 2 illustrates how we will find the aggregation value for the new left window.

Figure 2

Finding the aggregate for the new right window (for out-of-order records) is similarly predictable, as shown in Figure 3.

Figure 3

Aggregating for Existing Windows

When a new record falls within existing records, a scan is performed in the WindowStore to find windows that have a starting time of <= recordTime - windowSize. These windows will have their aggregations updated with the new record's value and will emit the new result. 

A thorough example can be found here.

  • No labels