Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Motivation

In order to handle table UDF calls efficiently, including ones which may call external systems, an asynchronous approach can be used. For example, there is AsyncScalarFunction (FLIP) which can be used to do a transformation on a scalar value, with multiple in-flight at a time, allowing results to be returned when completed.

This FLIP aims for exactly the same approach, but for Table Functions – in effect returns multiple results rather than a single one. In fact, such a class already exists: AsyncTableFunction. It’s used for Lookup joins, but isn’t exposed properly to the end user as other UDF types. This aims to expose it in just the same way as AsyncScalarFunction.

This seems to be effectively the same as FLIP-313.

Scope

The goal is to allow AsyncTableFunction to be used in the same place as normal TableFunctions. Namely:

  • Correlate Query (e.g. SELECT * FROM t1, LATERAL TABLE(func(f1)))

Changelog modes will be passed through, similar to how async scalar functions behave. The implementation referenced in this FLIP will be limited to the following:

  • Ordered Async operators: Some SQL queries could be compatible with an operator which allowed unordered results, which could provide a performance benefit. For now we'll only consider an operator that retains the input ordering.

  • Streaming mode: Some of the design considerations we're considering are focused on streaming.  To get good performance on batch, it's possible we might want to allow batching of async calls, but we're not addressing this at the moment.

Public Interfaces

The API is unchanged from how AsyncTableFunctions are defined for lookup joins with a couple small exceptions. Lookup joins support inferring the input and output types based on LookupCallContext and so a generic Row can be used without specifying what it contains. This FLIP won’t cover those cases, but the more straightforward ones where both arguments and output types are well specified, as with a conventional TableFunction. For example:

  • An explicit hint with a Row type, as here for the output:

    @FunctionHint(output = @DataTypeHint("ROW<s STRING >"))
    public class AsyncTestTableFunction extends AsyncTableFunction<Row> {
        public void eval(CompletableFuture<Collection<Row>> result, Integer i) {
            result.complete(Arrays.asList(Row.of("Row1 " + i), Row.of("Row2 " + i)));
        }
    }
  • For output, a single field of any non-Row type can be used and it will be implicitly wrapped in a Row :

    public class TestTableFunction extends AsyncTableFunction<String> {
        public void eval(CompletableFuture<Collection<String>> result, Integer i) {
            result.complete(Arrays.asList("Row1 " + i, "Row2 " + i));
        }
    }

New configurations will be introduced for the functionality, identical in nature to table.exec.async-scalar.* :

table.exec.async-table.buffer-capacity: 10
table.exec.async-table.timeout: 30s
table.exec.async-table.retry-strategy: FIXED_DELAY
table.exec.async-table.fixed-delay: 10s
table.exec.async-table.max-attempts: 3

Specifically, the following new configurations will be added:

Name (Prefix table.exec.async-table)

Meaning

buffer-capacity

The number of outstanding requests the operator allows at once

timeout

The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed

retry-strategy

FIXED_DELAY is for a retry after a fixed amount of time

retry-delay

The time to wait between retries for the FIXED_DELAY strategy.  Could be the base delay time for a (not yet proposed) exponential backoff.

max-attempts

The maximum number of attempts while retrying.

Proposed Changes

Planner Changes

Split Rules

One of the guiding philosophies to simplify code generation is to allow only a single call (the main async one) at a time at the given operator. To do this, we would like to split out any other calls to their own calcs.

There is an existing rule PythonCorrelateSplitRule which is useful for splitting things out from corrolates to their own calc. This should be factored out to be reusable as RemoteCorrelateSplitRule, taking a RemoteCalcCallFinder, which can be passed an instance looking for async table function calls.

Example SQL:

SELECT * FROM t1, LATERAL TABLE(asyncTable(scalarFunction(f1)));


Original RelNode

Becomes

FlinkLogicalCorrelate:
  left: FlinkLogicalCalc: projections: f0
  right: FlinkLogicalTableFunctionScan
  call: asyncTable(scalarFunction($cor0.f0))

FlinkLogicalCorrelate:
  left: FlinkLogicalCalc: projections: f1, scalarFunction(f1) as f0
  right: FlinkLogicalTableFunctionScan
  call: asyncTable($1)

Physical Rules

There will also need to be a StreamPhysicalAsyncCorrelateRule, which converts FlinkLogicalCorrelates to StreamPhysicalAsyncCorrelates. This will check for the existence of any async table function calls in the correlate call to determine whether to do that conversion.

Runtime Changes

Code Generation

The primary change is to extends DelegatingResultFuture, which currently handles just lookup joins, to handle wrapping the result in a Row if appropriate since we now handle implicit row wrapping. Beyond that, the call to FunctionCodeGenerator.generateFunction will ask for a AsyncFunction, similar to async calcs.

Operator

Since the call to the AsyncTableFunction is wrapped in a AsyncFunction taking input rows, we have the benefit of using the existing AsyncWaitOperator, which handles ordering, checkpointing, timeouts and other implementation details.  Since only ordered results are handled in this scope, ORDERED will be the behavior.

Compatibility, Deprecation, and Migration Plan

This should work with all of the existing Lookup Join cases, so there should be no issues – It’s utilizing the same interface, but shouldn’t change existing behavior.

New uses for correlate query calls are newly supported, and require no backwards compatibility.

Test Plan

  • Unit tests on all of the components

  • Test cases that cover:

    • The split rule

    • ITCases with a bunch of correlate queries

      • Single non-Row return value

      • Hint with Row return value

      • operand calc to table function input

      • Async operand calc to table function input

      • SELECT projections in inner correlate query (not supported)

      • WHERE conditions in inner correlate query (not supported)

Rejected Alternatives

Look up joins:

  • + Code already exists

  • - These are close, though support different queries that what are currently supported by TableFunctions, namely correlate queries.

  • - They don’t have full UDF support in the table API. E.g.

    tEnv.createTemporarySystemFunction("func", new MyTableFunction()); tEng.executeSql(...);