Discussion thread
Vote thread
JIRA

FLINK-4557 - Getting issue details... STATUS

Release1.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support:

  • Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group.

  • Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows.

Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables.

Since time-windowed aggregates will be the first operation that require the definition of time, this FLIP does also discuss how the Table API handles time characteristics, timestamps, and watermarks.


Public Interfaces

The feature will extend the Table API. We propose the following methods:

Group-Window Aggregates

Group-windows are evaluated once per group and defined using the window(w: Window) method. The Window parameter defines the type and parameters of the window to compute. The window() method can be applied on a Table (yielding a non-keyed DataStream.windowAll) or on a GroupedTable (resulting in a keyed window).

val res = tab
 .groupBy(‘a) // leave groupBy-Clause out to define non-keyed / global windows
 .window(w: Window)
 .select(‘a, ‘b.sum, ‘w.start, ‘w.end)


The Window defines how rows are mapped to windows. Window is not a custom function that users can extend, but only an API object that is internally handled and translated into a corresponding DataStream operation. The initially supported window definitions are listed below. More window definitions, e.g., in order to incorporate the features of the Trigger DSL (FLIP-9), might be added later.

Tumble (Tumbling Windows)

  • Tumbling Event-time window:
      .window(Tumble over 10.minutes on ‘rowtime as ‘w)

  • Tumbling Processing-time window:
      .window(Tumble over 10.minutes as ‘w)

  • Tumbling Row-count window: 
     .window(Tumble over 10.rows)


over

Mandatory.

Defines the length the window, either by time or row count.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Optional.

Assigns an alias for the window that the following select() clause can refer to in order to access window properties such as window start or end time.

Slide (Sliding Windows)

  • Sliding Event-time window:
      .window(Slide over 10.minutes every 2.minutes on ‘rowtime as ‘w)

  • Sliding Processing-time window:
      .window(Slide over 10.minutes every 2.minutes as ‘w)

  • Sliding Row-count window:
     .window(Slide over 10.rows every 2.rows)


over

Mandatory.

Defines the length the window, either by time or row count.

every

Mandatory.

Defines how frequent a new window is created, either by time or row count. The creation interval must be of the same type as the window length.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Optional.

Assigns an alias for the window that the following select() clause can refer to in order to access window properties such as window start or end time.

Session (Session Windows)

  • Session Event-time window:
      .window(Session withGap 10.minutes on ‘rowtime as ‘w)

  • Session Processing-time window:
     .window(Session withGap 10.minutes as ‘w)


withGap

Mandatory.

Defines the gap between two windows as time interval.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Optional.

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

Row-window Aggregates

Row-window aggregates are evaluated for each row and a row window defines for each row the range of preceding and succeeding rows over which an aggregation function is evaluated. If more than one aggregation function is evaluated for a result row, each aggregation function can be evaluated over a separate row window. Row windows are similar to standard SQL windows (OVER clause).

Row-windows are defined using the using the rowWindow(rw: RowWindow*) method. The rw parameter defines one or more row windows. Each row window must have an alias assigned. Aggregates in the select() method must refer to a RowWindow by providing an alias in the over() clause. The rowWindow() method can be applied to a Table yielding a non-keyed row-window aggregation or to a GroupedTable resulting in a keyed row-window aggregation.

val res = tab
  .groupBy(‘a) // optional, in case of absence non-keyed, non-parallel windows are computed
  .rowWindow(RowWindow as ‘x, RowWindow as ‘y)
  .select(‘a, ‘b.count over ‘x as ‘xcnt, ‘c.count over ‘y as ‘ycnt, ‘x.start, ‘x.end)

Event-time row-windows process rows with increasing timestamps (i.e., rows are sorted on timestamp) because the result of aggregations depends on the order of elements in the window. Hence, only ordered processing will guarantee consistent results.

Similar to the group window definition Window, RowWindow is not an interface that users can implement, but rather a collection of built-in API objects that are internally translated into efficient DataStream operators. There are many different ways how RowWindows can be defined, e.g., time semantics (processing-time, event-time, row-count), window-length (fixed length, variable length), window-start (aligned by time, on element arrival), window-end (aligned by time, time-out), etc.

The first version of row-window aggregates will not support multiple RowWindow definitions.

The Table API will initially provide a few RowWindow definitions and extend the set on user requests. The following RowWindow definitions will be initially provided.

TumbleRows

A TumbleRows row-window can be defined over a time interval or a row-count and have a fixed length. TumbleRows time windows start at fixed time offsets, row count windows start with the first element that arrives. Hence they are very similar to tumbling group-windows with respect to their start and length. What differs is the way that the aggregation function is computed. Aggregates are incrementally computed for each row that arrives and include all rows that precede the current row in the TumbleRows window.

  • Tumbling Event-time row-window:
      .rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)

  • Tumbling Processing-time row-window:
      .rowWindow(TumbleRows over 10.minutes as ‘w)

  • Tumbling Row-count row-window:
     .rowWindow(TumbleRows over 10.rows)



over

Mandatory.

Defines the length of the window. Can be time interval or row-count. Time windows start at fixed offsets (e.g., every hour), row-count windows start with the first arriving element.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Mandatory

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

SlideRows

A SlideRows window is defined over two intervals, a preceding and a succeeding interval. Both intervals can be defined as a time interval or a row-count (both intervals must be of same type). Aggregates are computed for a row (current row) over the valid range of records before and after the current row.

  • Slide Event-time row-window:
      .rowWindow(SlideRows preceding 5.minutes following 5.minutes on ‘rowtime as ‘w)

  • Slide Processing-time row-window:
      .rowWindow(SlideRows preceding 5.minutes following 5.minutes as ‘w)

  • Slide Row-count row-window:
      .rowWindow(SlideRows preceding 5.rows following 5.rows)

  • Slide unbounded-preceding row-window:
     .rowWindow(SlideRows.unboundedPreceding)
     .rowWindow(SlideRows.unboundedPreceding following 5.rows)


preceding,

unboundedPreceding

Optional. If not set following() must be set.

Defines the interval of valid rows preceding the current row. Can be time interval or row-count. unboundedPreceding() defines an unbounded interval from the start of the stream, i.e., aggregates are incrementally computed for each row from the “beginning” of the stream.

following

Optional. If not set preceding() must be set.

Defines the interval of valid rows following the current row. Can be time interval or row count.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attribute), for batch tables, the time attribute over which the query is evaluated.

as

Mandatory

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

SessionRows 

A SessionRows window is started with the first element that arrives (per key or global) and ends when no element (for a key or global) arrives for a certain amount of time. Aggregation functions are incrementally computed for each row that arrives and include all rows that precede the current row in the SessionRows window. After the window has been closed, the current aggregation state is discarded. 

  • Session Event-time row-window:
      .rowWindow(SessionRows withGap 10.minutes on ‘rowtime as ‘w)

  • Session Processing-time row-window:
      .rowWindow(SessionRows withGap 10.minutes as ‘w)


withGap

Mandatory.

Defines the time out after which the window is closed and its state discarded.

on

Optional. Required for event-time windows.

For streaming tables, on defines the time mode (‘rowtime is a logical system attributes), for batch tables, the time attribute over which the query is evaluated.

as

Mandatory.

Assigns an alias for the table that the following select() clause can refer to in order to access window properties such as window start or end time.

Event-time Timestamps and Watermarks in the Table API

 

Event-time operators require that all events are annotated with a timestamp and that the stream provides watermarks. There are two ways to create a streaming table 1) from an existing DataStream or from a StreamTableSource. In the former case we assume / require that timestamps and watermarks are already assigned to the stream and its records. For the latter case, the StreamTableSource is responsible to assign timestamps and watermarks. Since a StreamTableSource is just an interface with a method that returns a DataStream, each implementation of this interface is responsible to assign timestamps and watermarks in order to make the streaming table processable in event time. Hence, a StreamTableSource should provide a method to allow users to configure a custom timestamp and watermark assigner. At the moment, it is not possible to check if a DataStream provides timestamps and watermarks. A Table API query that includes an event-time operator will fail at execution time if those are not provided (this is the same behavior as for DataStream programs that include event-time operators).

Open Question: Should we extend the StreamTableSource interface (or convert it into an abstract class) to provide methods to assign timestamps and watermarks? If yes, we have another API breaking change. This would also mean that a table source is not immutable anymore.

For streaming tables, we add a keyword “rowtime” which is represented as a symbol in Scala to distinguish the processing mode. This means that the keyword look like a system-provided attribute however it is not part of table’s schema and cannot be accessed elsewhere (e.g., in a select or groupBy clause). Therefore, streaming tables are not allowed to have a “rowtime” column. The benefit of representing these keywords as system columns is that queries with event time operator can be executed on streaming and batch tables if the batch tables feature a long column called rowtime.

Proposed Changes

Table API

See the previous section about public interfaces for the Table API. The interfaces will be available in Scala and Java Table API. In addition, we will add checks for the reserved column names when streaming tables are registered. A constructed window will be translated into a logical window for validation.

Table API Validator

The Table API Validator must check for correctness of given window parameters. Parameters defining the size of a window must be either of type interval of milliseconds for time intervals or of type long for defining row-counts. For streaming tables, the optional window column must be the constant “rowtime”. For batch tables, it must be a column which is convertible to a long (e.g. timestamp is also a valid long). The alias of window must be checked for uniqueness.

Validation of grouping keys, extraction, and validation of aggregates should happen similarly to the current non-windowed validation. The validator must check for a preceding window/rowWindow clause before applying aggregations on streaming tables. Aggregations without a (row) window are not valid on streams (e.g. “select(‘f.count)” results in an exception when using streaming tables).

Window aliases must be resolved in validation of select clauses.

Calcite Optimization

Calcite does not have built-in support for streaming aggregations, yet. In order to proceed with the implementation, we introduce custom logical window aggregate nodes extending Calcite’s RelNodes. We might change this structure once Calcite provides the appropriate logical operations by default. At the beginning, optimization will be kept to a minimum.

Table API Runtime Code

Each stream aggregation will have the same result when executed on a batch table.

Group-Window Aggregates

The initial group-windows can be represented with the corresponding built-in windows of the DataStream API. On batch tables, the implementation of grouped-window aggregates depends on the window type:

  • Tumbling and Sliding windows can be computed by applying a (Flat)MapFunction that extracts the grouping key(s) for each row and applying a regular groupBy aggregation on all keys (non-window and window keys).

  • Session windows can be computed by partitioning the table on the non-window grouping keys and applying a groupReduce with a group sort on the window-key.

It should be possible to reuse the existing Table API aggregation functions for group-windows on streaming and batch tables.

Row-Window Aggregates

Row-windows have no counterpart in the DataStream API, yet. Hence, this feature will require new custom streaming operators, that take for instance care of state expiry. Window-row aggregates on batch tables can be implemented as DataSet MapPartitionFunctions. Initially, we only support row windows with a single window definition. Therefore, we might implement a custom operator / function for each type of row-windows.

It should be possible to reuse the existing Table API aggregation functions for row-windows on streaming and batch tables.

DataStream API / DataSet API

The discussed state-expiry feature would be helpful to implement the time out of row-windows. However, the feature can also be implemented using a custom stream operator, so no modification of the DataStream or DataSet API are necessary.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes new functionality and operators for the Table API. The behavior of existing operators is not modified. However, it introduces the “rowtime” keyword which can no longer be used as a column name in streaming tables. Existing streaming Table API programs that include tables with such columns will fail at compile time. The number of users affected by this change should be very low (if any) since the restricted feature set of the streaming Table API does not encourage its use.


Test Plan

Some of the proposed window operations are hard to test in end-to-end integration tests. Processing-time windows depend on processing speed and row-count windows on the order of the input values. Event-time windows do not face this problem if records are ordered by timestamp and watermarks are correctly assigned. We will provide tests for individual DataStream operators and DataSet functions.

Rejected Alternatives

No rejected alternatives yet.

Implementation Plan

The implementation of this effort can be divided into several subtasks:

  1. Implementation of group-windows on streaming tables. This includes implementing the API of group-windows, the logical validation for group-windows, and the definition of the “rowtime” keyword. Group-windows on batch tables won’t be initially supported and will throw an exception.

  2. Implementation of tumbling and sliding group-windows on batch tables.

  3. Implementation of session group-windows on batch tables.

  4. Implementation of SessionRow row-windows for streaming tables. This includes the definition of row-windows and the SessionRow row window type. SessionRow row-windows on batch tables won’t be initially supported and will throw an exception.

  5. Implementation of TumbleRow row-windows for streaming tables. This includes the definition of the TumbleRow row window type. TumbleRow row-windows on batch tables won’t be initially supported and will throw an exception.

  6. Implementation of SlidingRow row-windows for streaming tables. This includes the definition of the SlidingRow window type and its validation. SlidingRow windows on batch tables won’t be initially supported and will cause an exception.

  7. Implementation of SessionRow row-windows for batch tables.

  8. Implementation of TumbleRow row-windows for batch tables.

  9. Implementation of SlideRow row-windows for batch tables.