When a new record comes in, there are two new windows created and an unknown number of existing windows updated. For in order records, one new window will contain the record while the other will not contain the record, shown in Figure 1. With in order records, in this example before c comes in, the second (right) window will be empty while the first (left) window will need to contain records that have already been processed. Any already existing windows that b falls into need to be updated with b's value in their aggregation
Aggregating for New Windows
A new record for SlidingWindows will always be associated with two windows. If either of those windows already exist in the windows store, their aggregation will simply be updated to include the new record and no duplicate window will be added to the WindowStore. If the window does not exist, the correct aggregation will be calculated and it will be put in the window store. For in-order records, the right window will always be empty and will be created by the first record that comes after the current record and falls within that record's right window. Note: in order for this to be possible, we need to know the timestamp of the latest record within a window, and therefore we must use a TimestampedWindowStore.
The left window of in-order records and both windows for out-of-order records need to be updated with the values of records that have already been processed, assuming there are existing records that fall within these windows and that these windows do not already exist. Because each record creates or contributes to one window that includes itself and one window that does not, we have the set of all possible windows stored in our WindowStore. Therefore, when we need to find an aggregation to create a new window, that aggregation is already stored somewhere in the WindowStore. Figure 2 illustrates how we will find the aggregation value for the new left window.
Finding the aggregate for the new right window (for out-of-order records) is similarly predictable, as shown in figure 3.
Aggregating for Existing Windows
When a new record falls within existing windows, a scan is performed in the WindowStore to find windows that have a starting time of <= recordTime - windowSize. These windows will have their aggregations updated with the new record's value and will emit the new result.
Handling Empty Windows
The above theorems work for all cases, unless the right window is meant to be empty, or the left window is meant to only have the current record's aggregate in it.
If a left window is meant to be empty aside from the record's value, then the window with the closest end time to our record's timestamp will give us an aggregate that we do not want stored in our new record's left window. To detect whether a left window (for either an in-order or out-of-order record) is supposed to be empty aside from the record's value, we can see if it creates the previous record's right window. If a record either creates the previous record's right window, or falls within the previous record's right window (even if the window is already in the window store) we know the aggregate we found is correct. If it does not, then the left window is meant to be empty aside from the current record's value.
If a right window is meant to be empty, then the window with the closest start time to our record's timestamp will give us an aggregate that we do not want stored in our new record's right window. To detect whether a right window (for an out-of-order record) is supposed to be empty, can check 2 things. If the window with the closest start time to our record is a left window, or if it is a right window but the previous record's right window is already created. If either of these is true, then the aggregate we found is correct. If it is a right window and the previous record's right window is not created, then the aggregate is wrong and the new right window is meant to stay empty.