Discussion thread
Vote thread
JIRA

FLINK-10972 - Getting issue details... STATUS FLINK-13470 - Getting issue details... STATUS

Release1.9

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Table API is a declarative API to define queries on static and streaming tables. And made tremendous progress in the last two years in terms of both functionality and performance, which only makes our desire to cover even more scenarios stronger. This FLIP proposes to enhancing the functionality and productivity of Table API in a systematic manner. In particular, we seek to support:

  • TableAggregateFunction, i.e., table aggregates are computed for a group of elements, and the function outputs a group of elements.

  • Map Operator, which can apply a scalar function and output multi-column.

  • FlatMap Operator, which can apply a table function and output multi-row.

  • Aggregate Operator, which can apply a aggregate function and output multi-column.

  • FlatAggregate Operator, which can apply a table aggregate function and output multi-row.

Analysis


First, the user defined functions can be classified by the numbers of rows (logically) for their inputs and outputs. The table below is a quick summary.

UDF

Single Row Input

Multiple Row Input

Single Row Output

ScalarFunction

AggregateFunction

Multiple Row Output

TableFunction

TableAggregateFunction

We introduced a new TableAggregateFunction to make this table complete here. This function takes multi-row input and produces multi-row output. In some sense, it behaves like a simple user defined operator.


Secondly,we can also classify the functions by looking at the number of columns for their inputs and outputs. We notice that both ScalarFunction and AggregateFunction can only output a single column, while TableFunction can output multiple columns. We would like to extend the ability of outputting multiple columns to ScalarFunction and AggregateFunction. For this, we follow what we did in TableFunction. If a ScalarFunction or AggregateFunction returns an object of type T (which is a Tuple or Pojo), we can expand T to multiple columns. But this could be problematic for Table.select because T could also be interpreted as the type of a single column. To make the semantics unambiguous, we need to introduce new methods for the new behavior (TableFunction did not suffer from this problem because we used lateral join instead of Table.select for that). The table below summarizes the new methods that we want to introduce for functions which have multi-column outputs.

Table Method

Single Column Output

Multiple Column Output

ScalarFunction

Table.select

Table.map

AggregateFunction

Table.select

GroupedTable.aggregate

TableFunction

N/A

Table.flatMap

TableAggregateFunction

N/A

GroupedTable.flatAggregate

We have introduced four new operators in Table here: Table.map, GroupedTable.aggregate, Table.flatMap, and GroupedTable.flatAggregate. These new operators will expand a complex output T to multiple columns.

Public Interfaces and new operators

TableAggregateFunction

The behavior of table aggregates is most like GroupReduceFunction did, which computed for a group of elements, and output  a group of elements. The TableAggregateFunction can be applied on GroupedTable.flatAggregate() which will revisit later. The interface of TableAggregateFunction as follows:

/**

* Base class for User-Defined table Aggregates.

*

* <p>The behavior of an {@link TableAggregateFunction} can be defined by implementing

* a series of custom methods. An {@link TableAggregateFunction} needs at least three methods:

*  - createAccumulator,

*  - accumulate, and

*  - emitValue or emitValueWithRetract

*

* <p>There are a few other methods that can be optional to have:

*  - retract,

*  - merge

*

* <p>All these methods muse be declared publicly, not static and named exactly as the names

* mentioned above. The methods createAccumulator and emitValue are defined in the

* {@link TableAggregateFunction} functions, while other methods are explained below.

*

*

* {@code

* Processes the input values and update the provided accumulator instance. The method

* accumulate can be overloaded with different custom types and arguments. An AggregateFunction

* requires at least one accumulate() method.

*

* param accumulator the accumulator which contains the current aggregated results

* param [user defined inputs] the input value (usually obtained from a new arrived data).

*

* public void accumulate(ACC accumulator, [user defined inputs])

* }

*

*

* {@code

* Retracts the input values from the accumulator instance. The current design assumes the

* inputs are the values that have been previously accumulated. The method retract can be

* overloaded with different custom types and arguments. This function must be implemented for

* datastream bounded over aggregate.

*

* param accumulator   the accumulator which contains the current aggregated results

* param [user defined inputs] the input value (usually obtained from a new arrived data).aol

*

* public void retract(ACC accumulator, [user defined inputs])

* }

*

*

* {@code

* Merges a group of accumulator instances into one accumulator instance. This function must be

* implemented for datastream session window grouping aggregate and dataset grouping aggregate.

*

* param accumulator  the accumulator which will keep the merged aggregate results. It should

*                    be noted that the accumulator may contain the previous aggregated

*                    results. Therefore user should not replace or clean this instance in the

*                    custom merge method.

* param its  an {@link Iterable} pointed to a group of accumulators that will be

*                    merged.

*

* public void merge(ACC accumulator, Iterable<ACC> its)

* }

*

*

* {@code

* Called every time when an table-valued aggregation result should be materialized.

* The returned value could be either an early and incomplete result

* (periodically emitted as data arrive) or the final result of the table-valued aggregation.

*

* The implementation logic do not need deal with retract messages.

* For Example, if we calculate top3(ASC order), the behavior as follows:

* Assume there are 4 messages: {1, 2, 6, 4}

* - Framework should generate retract message according to DAG pattern.

* -----------------------------------------------------------------------

* | Input |  OutPut | Framework  behavior of when need retract message |

* ------------------------------------------------------------------------

* | 1      | collector.collect(1)|                                                                         |

* -----------------------------------------------------------------------

* | 2      | collector.collect(1)| collector.retract(1)                                          |

* |         | collector.collect(2)|                                                                         |

* -----------------------------------------------------------------------

* | 6      | collector.collect(1)| collector.retract(1)                                          |

* |         | collector.collect(2)| collector.retract(2)                                         |

* |         | collector.collect(6)|                                                                         |

* -----------------------------------------------------------------------

* | 4     | collector.collect(1) | collector.retract(1)                                          |

* |        | collector.collect(2) | collector.retract(2)                                         |

* |        | collector.collect(4) | collector.retract(6)                                         |

* -----------------------------------------------------------------------

*

* public void emitValue(ACC accumulator, Collector<T> out)

* }

*

* {@code

* Called every time when an table-valued aggregation result should be materialized.

* The returned value could be either an early and incomplete result

* (periodically emitted as data arrive) or the final result of the table-valued aggregation.

*

*

* The implementation logic should deal with retract message.

* For Example, if we calculate top3(ASC order), the behavior as follows:

* Assume there are 4 messages: {1, 2, 6, 4}

* - Framework do not need generate the retract message.

* ----------------------------------

* | Input |               OutPut               |

* ----------------------------------

* | 1     | collector.collect(1) |

* ---------------------------------

* | 2     | collector.collect(2) |

* ---------------------------------

* | 6     | collector.collect(6) |

* ---------------------------------

* | 4     | collector.retract(6) |

* |        | collector.collect(4) |       

* ---------------------------------

*

* public void emitUpdateWithRetract(ACC accumulator, Collector<T> out)

* }

*

* @param <T>   the type of the table aggregation result

* @param <ACC> the type of the table aggregation accumulator. The accumulator is used to keep the

*             table aggregated values which are needed to compute an table aggregation result.

*             TableAggregateFunction represents its state using accumulator, thereby the state of the

*             TableAggregateFunction must be put into the accumulator.

*/

public abstract class TableAggregateFunction<T, ACC> extends UserDefinedFunction {

 /**

  * Creates and init the Accumulator for this {@link TableAggregateFunction}.

  *

  * @return the accumulator with the initial value

  */

 public abstract ACC createAccumulator();

 /**

  * Returns the DataType of the TableAggregateFunction's result.

  *

  * @return The DataType of the TableAggregateFunction's result or null if the result type

  *       should be automatically inferred.

  */

 public DataType getResultType() {

    return null;

 }

 /**

  * Returns the DataType of the TableAggregateFunction's accumulator.

  *

  * @return The DataType of the TableAggregateFunction's accumulator or null if the

  *       accumulator type should be automatically inferred.

  */

 public DataType getAccumulatorType() {

    return null;

 }

NOTE:  Since the execution of the Stream operator has two modes, `ACC` and `ACCRetract`, users can achieve better performance by implementing special interfaces for streaming. The table below is a quick summary.


emitValue

emitUpdateWithRetract

emitUpdateWithoutRetract

ACC

Y

N

Y

ACCRetract

Y

Y

N

  • emitValue - for batch and streaming.
  • eimitUpdateWithRetract - only for streaming in ACC mode(need key definition on TableAggregateFunction, under discussion).
  • emitUpdateWithoutRetract - only for streaming in ACCRetract mode.

Collector Interface

public interface Collector<T> {

 /**

  * Emits a record.

  *

  * @param record The record to collect.

  */

 void collect(T msg);

/**

  * Emits a retract record.

  *

  * @param record The record to retract.

  */

 void retract(T msg);

}


Table.Map

Map operator is a new operator of Table. It takes a ScalarFunction which returns a single row with multiple columns. The usage as follows:

val res = tab
  .map(fun: ScalarFunction)  // output has columns ‘a, ‘b, ‘c
  .select(‘a, ‘c)


Table.FlatMap

FlatMap operator is a new operator of Table. It takes a TableFunction which returns multiple rows. The usage as follows:

val res = tab
  .flatMap(fun: TableFunction)  // output has columns ‘a, ‘b, ‘c
  .select(‘a, ‘c)


GroupedTable.aggregate

Agg operator is a new operator of Table/GroupedTable. It takes an AggregateFunction which returns a single row with multiple columns. The usage as follows:

val res = tab
.groupBy(‘a) // leave out groupBy-clause to define global aggregates
.aggregate(fun: AggregateFunction)  // output has columns ‘a, ‘b, ‘c
.select(‘a, ‘c)


GroupedTable.flatAggregate

FlatAgg operator is a new operator of Table/GroupedTable. It takes a TableAggregateFunction which returns multiple rows. The usage as follows:

val res = tab
.groupBy(‘a) // leave out groupBy-clause to define global table aggregates
.flatAggregate(fun: TableAggregateFunction)  // output has columns ‘a, ‘b, ‘c
.select(‘a, ‘c)


Time attribute and group keys

The time attribute is lost after the map/flatMap operation. e.g.:

tab('name, 'age, 'address, 'rowtime)

tab.map(udf('name)).as('col1, 'col2) // Does not include time attributes.


For the aggregate/flatAggregate operation should force users to use select. E.g.:

 val result = tab.window(Tumble ... as 'w)

    .groupBy('w, 'k1, 'k2)

    .aggregate(aggFun('a)) // Generate AggregateTable that can only do select

    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)  // Cannot contain aggregate expression.


Proposed Changes


See the previous section about public interfaces and table operators for the Table API. The interfaces and operators will be available in Scala and Java Table API. And we will add new function interface of TableAggregateFunction which can using in flatAggregate operator. In addition, we will add checks for all of the changes.

Compatibility, Deprecation, and Migration Plan


This FLIP proposes new functionality and operators for the Table API. The behavior of existing operators is not modified.

Test Plan

This FLIP proposes can check by both It test case and validate test case.

Rejected Alternatives

No rejected alternatives yet.

Implementation Plan

The implementation of this effort can be divided into several subtasks:

  1. Implementation of TableAggregateFunction on static and streaming tables

  2. Implementation of Map Operator on static and streaming tables

  3. Implementation of FlatMap Operator on static and streaming tables

  4. Implementation of Aggregate Operator on static and streaming tables

  5. Implementation of FaltAggregate Operator on static and streaming tables

Future work

  1. Discuss how to support column operations,such as: udf(*)/udf(_)/udf(*.reshape())/tab.drop(columns) etc. Design doc

  2. Discuss the function of GroupedTable.select('*).

  3. Discuss User-Defined window, User-Defined Jion and Iteration.

  4. Introducing nested tables.