Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Motivation

UDF execution can potentially be rather complex, from calling an external system to fetch data while computing a function, to executing the function externally as a remote procedure call. In order to handle table UDF calls efficiently, including ones which may call external systems or do RPCs, an asynchronous approach can be used. For example, there is AsyncScalarFunction (FLIP) which can be used to do a transformation on a scalar value, with multiple in-flight at a time, allowing results to be returned when completed. 

AggregateFunctions might want to do similar calls as well, and as with Scalar functions, a blocking approach is very inefficient. The async approach used above doesn’t lend itself quite as well for aggregate implementations though. For one, they are stateful and order is critical when applying an accumulate or retract call. Other things just don’t make sense to issue in parallel, including getValue call – only the final accumulator is usually what’s needed.

AggregateFunctions need a different approach which can handle many in-flight inputs at once, but which maintain a well defined ordering and can evaluate getValue at the end of a call, for example. This can be solved by a batch API that will be called a BundledAggregateFunction. Note the term “bundle” used, so as not to confuse with other meanings of the word batch. This API will allow efficiencies by handling many inputs at once, but maintain all the right semantics required by aggregates.

Note that such a batch API is likely not appropriate for use by most end-users, but is ideal for efficient implementation of RPCs and external calls which themselves handle batches.

Public Interfaces

The proposal is to create a new interface which can be implemented by an existing AggregateFunction. The intention is that a class can implement either the classic interface or the bundled version.

public interface BundledAggregateFunction extends FunctionDefinition {
    /**
     * Whether the implementor has a bundledAccumulateRetract method which should be used in place
     * of createAccumulator, accumulate, retract, getValue.
     */
    boolean supportsBundling();

    /**
     * Whether the implementation has retract functionality.
     */
    boolean supportsRetracting();

    /**
     * Whether the implementation has merge functionality.
     */
    boolean supportsMerging();

    /**
     * The main bundled method call which takes many inputs and applies them to the
     * accumulator by executing the appropriate accumulate or retract calls.
     */
    default public List<BundledKeySegmentApplied> bundledAccumulateRetract(List<BundledKeySegment> batch) throws Exception {
      throw new UnsupportedOperationException("Must be implemented if supportsBundling() is true");
    }
}

If supportsBundling returns true, an implementing class would override the method bundledAccumulateRetract, which will then be invoked in place of the normal methods createAccumulator, getValueaccumulate and retract.

It takes a list of BundledKeySegment objects and returns a same-sized list containing corresponding BundledKeySegmentApplied objects.

A BundledKeySegment contains incoming rows for a given key, with the key’s own accumulator. Non-bundled operators would normally break this up into calls to createAccumulator, getValue, and accumulate, but with BundledKeySegment it can be done with a single call. A List<BundledKeySegment> therefore allows for batches of input rows to be processed across a batch of keys.

Here is the partial definition of BundledKeySegment:

/**
 * One segment where all elements in the segment are for the same key.
 */
public class BundledKeySegment {
    /** The common key of the segment. */
    private final RowData key;

    /** The input rows to apply, where all rows are for the common key. */
    private final List<RowData> rows;

    /** 
     * The accumulator value under the current key. Can be empty at the start.
     * If there is more than one entry, the accumulators will be merged before
     * any rows are accumulated or retracted. 
     */
    private final List<RowData> accumulators;

    /**
     * If set, returns the updated value after each row is applied rather than
     * only the final value.
     */
    private final boolean updatedValuesAfterEachRow;

    // Getters for accessing fields
    ...
}

Here, rows are input rows with an appropriate RowKind indicating whether it should be taken as an accumulate or retract. An implementor can utilize RowDataUtil.isAccumulateMsg and RowDataUtil.isRetractMsgon each row, to know whether to apply accumulate or retract logic, many at a time.

The accumulators are the state, if any, before any rows have been applied. If there is more than one entry in the list, the accumulators are first merged into one. Note that it can also be empty which means that there is no current state and a new accumulator should be created in the implementor, similar to createAccumulator.

Lastly, updatedValuesAfterEachRow indicates whether the implementor should return the value at only the beginning and end of the call, or also after every input row is applied. The latter behavior is required by some operators.

Here is the partial definition of BundledKeySegmentApplied:

/**
 * The resulting accumulator and values after applying rows associated with a single
 * key.
 */
public class BundledKeySegmentApplied {
    /** The final accumulator after all rows are applied. */
    private final RowData accumulator;

    /** The value at the start of the call before any rows have been applied. */
    private final RowData startingValue;

    /** The value at the end of the call after all rows have been applied. */
    private final RowData finalValue;

    /** The value after each row has been applied. */
    private final List<RowData> updatedValuesAfterEachRow;

    // Getters for accessing fields
    ...
}

This represents the result of applying all of those rows at once, with the resulting accumulator, value, and the optional updatedValuesAfterEachRow set if requested.

Note that both the input and output take internal RowData rather than Row. It would be possible to create versions for both depending on whether internal or external types are required by the TypeInference, but support for Row is out of the scope of this FLIP.

The following new configurations will also be added:

Name (Prefix table.exec.bundled-agg)

Meaning

allow-latency

The amount of time allowed to wait for a full batch size

size

The maximum size of input rows in a bundle, across all included keys

Group By Example

A query which can illustrate the a series of interactions is the following:

-- Group by even and odd values, and compute AVG
SELECT v1 % 2, MyAvg(v1)
    FROM (
      -- Create data which is grouped by k1, so updates will create retracts
      SELECT k1, LAST_VALUE(v1) as v1
      FROM (VALUES (1, 1), (2, 2), (5, 5), (2, 6), (1, 3)) AS t (k1, v1)
      GROUP BY k1)
GROUP BY v1 % 2;

Imagine we have an implementation of an average function, MyAvg which implements the bundled API. This could have an accumulator which is of the format Row<sum BIGINT, count BIGINT>. It’s useful to step through the API calls to see it in use.

This above query will have two keys for v1 % 2 (evens and odds), and so two BundledKeySegment instances per bundledAccumulateRetract call. For simplicity in our example, a bundle can only process 3 input rows at a time:

BundleList<BundledKeySegment>List<BundledKeySegmentApplied>
0
{
 "key": 1,
 "rows": [ "+I[1]", "+I[5]" ],
 "accumulators": [],
},
{
 "key": 0,
 "rows": [ "+I[2]"],
 "accumulators": [],
}

Notes: Accumulators are not set since this is the first request for both keys.

{
 "accumulator": "+I[6, 2]",
 "startingValue": "+I[null]",
 "finalValue": "+I[3]"
},
{
 "accumulator": "+I[2, 1]",
 "startingValue": "+I[null]",
 "finalValue": "+I[2]"
}

Notes: Note that both accumulators are now set. Starting values are null (since there are initially no data points), and final values are from consuming the rows. The odd response is from accumulating 1 and 5, which means sum is 6, count is 2, and final value is 6/2 = 3.

1
{
 "key": 1,
 "rows": [ "-U[1]" ],
 "accumulators": ["+I[6, 2]"],
},
{
 "key": 0,
 "rows": [ "-U[2]", "+U[6]" ],
 "accumulators": ["+I[2, 1]"],
}

Notes: Now accumulators are set and input now include some retractable rows.

{
 "accumulator": "+I[5, 1]",
 "startingValue": "+I[3]",
 "finalValue": "+I[5]"
},
{
 "accumulator": "+I[6, 1]",
 "startingValue": "+I[2]",
 "finalValue": "+I[6]"
}

Notes: The odd response retracts 1, and so now has a sum of 5 and count of 1, leaving a final value of 5. The even response effectively replaces 2 with 6.

2
{
 "key": 1,
 "rows": [ "+U[3]"],
 "accumulators": ["+I[5, 1]"],
}

Notes: Finally the odd segment gets 3. Notice that there is no even segment because there is no even row data.

{
 "accumulator": "+I[8, 2]",
 "startingValue": "+I[5]",
 "finalValue": "+I[4]"
}

Notes: The final odd response shows a sum of 8, count of 2, and final value of 8/2 = 4.

Example Bundled Average Aggregation Definition

It’s also useful to confider how a simple, local implementation of MyAvg might be defined:

@FunctionHint(accumulator = @DataTypeHint("ROW<sum BIGINT, count BIGINT>"))
public static class AvgAggregate extends AggregateFunction<Long, Row> implements BundledAggregateFunction {

    private static final long serialVersionUID = 4585229396060575732L;
    
    // Normal definitions of accumulate, getValue, etc
    ...
    
    @Override
    public boolean canBundle() {
        return true;
    }

    @Override
    public boolean canRetract() {
		return true;
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public List<BundledDataKeySegmentUpdate> bundledAccumulateRetract(List<BundledDataKeySegment> batch) throws Exception {
        List<BundledDataKeySegmentUpdate> updates = new ArrayList<>();
        for (BundledDataKeySegment segment : batch) {
            final GenericRowData acc;
            if (segment.getAccumulators().isEmpty()) {
                acc = GenericRowData.of(0L, 0L);
            } else {
                acc = (GenericRowData) merge(segment.getAccumulators());
            }

            RowData previousValue = GenericRowData.of(getAverage(acc.getLong(0), acc.getLong(1)));

            List<RowData> valueUpdates = new ArrayList<>();
            for (RowData row : segment.getRows()) {
                if (RowDataUtil.isAccumulateMsg(row)) {
                    acc.setField(0, acc.getLong(0) + row.getLong(0));
                    acc.setField(1, acc.getLong(1) + 1);
                } else {
                    acc.setField(0, acc.getLong(0) - row.getLong(0));
                    acc.setField(1, acc.getLong(1) - 1);
                }
                if (segment.getUpdatedValuesAfterEachRow()) {
                    valueUpdates.add(GenericRowData.of(getAverage(acc.getLong(0), acc.getLong(1))));
                }
            }

            RowData newValue = GenericRowData.of(getAverage(acc.getLong(0), acc.getLong(1)));;
            updates.add(
                    new BundledDataKeySegmentUpdate(
                            acc, previousValue, newValue, valueUpdates));
        }
        return updates;
    }

    private GenericRowData merge(List<RowData> accs) {
        GenericRowData result = GenericRowData.of(0L, 0L);
		for (RowData acc : accs) {
          result.setField(0, result.getLong(0) + acc.getLong(0));
          result.setField(1, result.getLong(1) + acc.getLong(1));
        }
        return result;
    }
}



Proposed Changes

Code Generation

In order to invoke the user UDF method bundledAccumulateRetract, we must generate all of the relevant code for it. This is all handled by implementers of AggsHandleFunctionBase.

To accommodate the new logic, a new method is to be added to AggsHandleFunctionBase:

public interface AggsHandleFunctionBase extends Function {
 ...
  public List<BundledKeySegmentApplied> bundledAccumulateRetract(List<BundledKeySegment> bundle)
          throws Exception;
}

This new bundledAccumulateRetract call is mutually exclusive with the conventional createAccumulator, getValue, accumulate and retract calls, so operators are expected to call one or the other.

The main implementation of this class is created in AggsHandlerCodeGenerator, which delegates to different codegens depending on the function called (e.g. imperative, declarative). A new BundledImperativeAggCodeGen will be added, which handles only implementers of BundledAggregateFunction.

There is complexity when there are multiple aggregate function types because codegen creates a single AggsHandleFunction which handles all calls. For example:

SELECT BUNDLED_AGG(a), CONVENTIONAL_AGG(b), SUM(c), d FROM Input group by d;

This would contain a bundled imperative, a normal imperative, and a declarative call. Because the use of bundle is mutually exclusive with other methods, the generated implementation of bundle must simulate the conventional calls to createAccumulator, getValue, accumulate and retract for the non-bundled function types.

Operators

Once we have a AggsHandleFunction with the bundledAccumulateRetract call, it can be invoked in new operators, aiming for the same coverage as existing support for Aggregate functions.

An existing operator might have code similar to the following:

RowData acc = accState.value();
if (acc == null) {
  acc = function.createAccumulators();
}
function.setAccumulators(acc);
RowData prevAggValue = function.getValue();
for (RowData input : inputRows) {
  if (isAccumulateMsg(input)) {
      function.accumulate(input);
  } else {
      function.retract(input);
  }
}
RowData newAggValue = function.getValue();

This can now be written in a new operator as so:

RowData acc = accState.value();
BundledKeySegment request = new BundledKeySegment(currentKey, inputRows, Collections.singletonList(acc), false);
BundledKeySegmentApplied update = function.bundle(request);
RowData prevAggValue = update.getStartingValue();
RowData newAggValue = update.getFinalValue();

Note that some of the complexity, but also the flexibility is now contained in the UDF.

Group By: Can utilize MapBundleFunction as a base for batching inputs. After collecting a bundle as a Map<RowData, List<RowData>> buffer, it’s the perfect format to be used with the new batching interface since we effectively have a key-based list of inputs to apply.

Over Operators: Can model over operators roughly on the ones in the package org.apache.flink.table.runtime.operators.over.

These operators all utilize callbacks for row time or processing time, at which point they collect rows for a given key and can call the bundledAccumulateRetract interface.

Note that the implementation of a bundled RowTimeRowsUnboundedPrecedingFunction needs the value after each input row has been applied to the accumulator, so it would set updatedValuesAfterEachRow to true.

Window Operator: Can add a new BundledCombiner which can combine many input rows at once with the accumulator – may also introduce a new combine method which allows for multiple keys at once instead of one key at a time.

Compatibility, Deprecation, and Migration Plan

This FLIP would introduce a new interface.  Non-implementers should not have any changes to behavior, including existing aggregate functions.

Test Plan

Change will be covered by unit tests for all of the new functionality as well as IT Cases for all of the newly added operators. Some of these cases include:

  • Group by queries no retractions
  • Group by queries with retractions
  • Group by with multiple bundled aggregates no retractions
  • Group by with multiple bundled aggregates with retractions
  • Group by with bundled aggregate and system function and conventional aggregate UDF
  • Over functions (including the above combinations as well)
    • RowTime Rows Bounded
    • RowTime Range Bounded
    • RowTime Row Unbounded
    • RowTime Row Bounded
    • ... Others if implemented

Rejected Alternatives

  • Asynchronous calls. Ordering issues are the main consideration.