You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The DSL currently supports windowed aggregations for only two types of time-based window: hopping and tumbling. A third kind of window is defined, but only used in join operations: sliding windows. Users needing sliding window semantics can approximate them with hopping windows where the advance time is 1, but this workaround only artificially resembles the sliding window model; aggregates will be output for every hopping window, of which there will likely be a large number (specifically, the size of the window in milliseconds). The semantics for out-of-order data also differ, as sliding windows have no concept of a grace period and rather represent only "the current result for the current window"

Public Interfaces

Rather than adapt the existing TimeWindows interface (which provides semantics for tumbling and hopping windows), I propose to add a separate SlidingWindows class. This will resemble a stripped-down version of the TimeWindows class, and have only one public API (plus several methods in needs to override from the abstract base class Windows<W>):

public final class SlidingWindows extends Windows<TimeWindow> {    

	public static SlidingWindows of(final Duration size);

	@Override
	public Map<Long, TimeWindow> windowsFor(final long timestamp);

	@Override
    public long gracePeriodMs() { return 0; }

	@Override
    public long size() { return sizeMs;}

	@Override
    public boolean equals(final Object o);
 
	@Override
    public int hashCode();

	@Override
	public String toString();
}

This would effectively be used in the same manner as TimeWindows. For example to do a counting aggregation with sliding windows, you would have something like

final KTable<Windowed<String>, Long> counts = source
            .groupBy((key, value) -> value)
            .windowedBy(SlidingWindows.of(Duration.ofSeconds(5)))
            .count();


Proposed Changes

The semantics of the sliding window based aggregations are as follows:

Only one window is defined for each record being processed, which ends at the current stream time and begins "window size" milliseconds before. The window thus "slides" forward in time, advancing when new data arrives that advances the stream time. Some notes:

  • Both time bounds are inclusive
  • At most one record is forwarded when new data arrives
  • Out of order data that still falls within the current window is simply added to the running aggregate
  • Out of order data that arrives outside the current window is dropped
  • The window size is effectively the retention period for the records, and the grace period is zero


Simple Design

As an initial, POC implementation we can process sliding window aggregates by storing, for each record, the current aggregate of a window that begins at its timestamp. When a new record arrives we can then compute the new total aggregate for the window by aggregating its value with the first aggregate value defined in the window. We would then have to traverse the rest of the window to update the aggregate for each window that starts at/before the new record's timestamp.

Pros: The main advantage of this is that it requires minimal storage. It would also require minimal code changes, as it is only needs to store (key, Agg) pairs and is therefore compatible with the current KStreamWindowAggregateProcessor.

Cons: Not particularly efficient: for N records in the window it requires O(n) aggregations as well as O(n) writes to the underlying store.

POC PR can be found here

O(sqrt(N)) Design

As mentioned above, the initial design is not ideal. The two issues with the simple implementation are that it takes linear time, and requires linear writes to the underlying store. One possibility to improve on this design is as follows:

Break up the window into separate buckets, with each bucket storing its collection of values as well as the "running aggregate". By "running aggregate" I mean not the aggregate of the bucket alone, but the total aggregate from the end of that bucket to the end of the window. When a new record arrives, the aggregate is computed by aggregating: the new value, all the values in the first bucket, and the running aggregate of the first bucket. The running aggregates of the other buckets are updated as well (if applicable). 

The values would be stored in the usual window store (which can be either persistent or in-memory), and the "running aggregates" could also be either persistent or in-memory. If an in-memory data structure is used, upon restore the values can be used to recompute the aggregates. If persistent, the aggregates could be held either in a separate RocksDB instance or in the same persistent window store as the values by using some special prefix to distinguish actual values from running aggregates.

Pros: If we have N records and M buckets, the time complexity of each incoming record is (N/M) + M and there are at most M writes to the underling store. Optimizing for time complexity alone gives M = sqrt(N) for overall O(sqrt(N)) (although there are other considerations – see below)

Cons: Additional spatial complexity (N + M storage is needed). If the running aggregates are held in-memory, restore time is increased; if held in the same RocksDB instance as the window store we increase the size and writes to that store with performance implications; if held in a separate RocksDB instance there is additional memory overhead (relative to reusing the window store's RocksDB)

Determining 'M': M should be chosen carefully considering the relative expense of aggregations and writes to the underlying store – the appropriate value for M will depend, for example, on whether the underlying store is RocksDB or in-memory. Also we may consdier optimizations such as loading the first (oldest) bucket into memory for quicker reads, in which case M must be large enough s.t. N/M values will fit in memory.

Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

Rather than store a running aggregate for each record, we could just store the values themselves and then compute the aggregate over all of them. This is O(n) time still but wouldn't require as many writes to the underlying store so it would presumably be a better design. However it would require a different kind of store in KStreamWindowAggregateProcessor, which if we are going to do we may as well shoot for better than linear time

In general, many alternatives had to be rejected because the current aggregator can only apply a value to an aggregate; it can not be used to combine two aggregates. For example it might seem better to store the actual aggregate of each bucket rather than the "running aggregate" .. unfortunately we would have no way to combine them.


  • No labels