DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation
In order to handle table UDF calls efficiently, including ones which may call external systems, an asynchronous approach can be used. For example, there is AsyncScalarFunction (FLIP) which can be used to do a transformation on a scalar value, with multiple in-flight at a time, allowing results to be returned when completed.
This FLIP aims for exactly the same approach, but for Table Functions – in effect returns multiple results rather than a single one. In fact, such a class already exists: AsyncTableFunction. It’s used for Lookup joins, but isn’t exposed properly to the end user as other UDF types. This aims to expose it in just the same way as AsyncScalarFunction.
This seems to be effectively the same as FLIP-313.
Scope
The goal is to allow AsyncTableFunction to be used in the same place as normal TableFunctions. Namely:
Correlate Query (e.g.
SELECT * FROM t1, LATERAL TABLE(func(f1)))
Changelog modes will be passed through, similar to how async scalar functions behave. The implementation referenced in this FLIP will be limited to the following:
Ordered Async operators: Some SQL queries could be compatible with an operator which allowed unordered results, which could provide a performance benefit. For now we'll only consider an operator that retains the input ordering.
Streaming mode: Some of the design considerations we're considering are focused on streaming. To get good performance on batch, it's possible we might want to allow batching of async calls, but we're not addressing this at the moment.
Public Interfaces
The API is unchanged from how AsyncTableFunctions are defined for lookup joins with a couple small exceptions. Lookup joins support inferring the input and output types based on LookupCallContext and so a generic Row can be used without specifying what it contains. This FLIP won’t cover those cases, but the more straightforward ones where both arguments and output types are well specified, as with a conventional TableFunction. For example:
An explicit hint with a
Rowtype, as here for the output:@FunctionHint(output = @DataTypeHint("ROW<s STRING >")) public class AsyncTestTableFunction extends AsyncTableFunction<Row> { public void eval(CompletableFuture<Collection<Row>> result, Integer i) { result.complete(Arrays.asList(Row.of("Row1 " + i), Row.of("Row2 " + i))); } }For output, a single field of any non-
Rowtype can be used and it will be implicitly wrapped in aRow:public class TestTableFunction extends AsyncTableFunction<String> { public void eval(CompletableFuture<Collection<String>> result, Integer i) { result.complete(Arrays.asList("Row1 " + i, "Row2 " + i)); } }
New configurations will be introduced for the functionality, identical in nature to table.exec.async-scalar.* :
table.exec.async-table.buffer-capacity: 10 table.exec.async-table.timeout: 30s table.exec.async-table.retry-strategy: FIXED_DELAY table.exec.async-table.fixed-delay: 10s table.exec.async-table.max-attempts: 3
Specifically, the following new configurations will be added:
Name (Prefix table.exec.async-table) | Meaning |
|---|---|
buffer-capacity | The number of outstanding requests the operator allows at once |
timeout | The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed |
retry-strategy | FIXED_DELAY is for a retry after a fixed amount of time |
retry-delay | The time to wait between retries for the FIXED_DELAY strategy. Could be the base delay time for a (not yet proposed) exponential backoff. |
max-attempts | The maximum number of attempts while retrying. |
Proposed Changes
Planner Changes
Split Rules
One of the guiding philosophies to simplify code generation is to allow only a single call (the main async one) at a time at the given operator. To do this, we would like to split out any other calls to their own calcs.
There is an existing rule PythonCorrelateSplitRule which is useful for splitting things out from corrolates to their own calc. This should be factored out to be reusable as RemoteCorrelateSplitRule, taking a RemoteCalcCallFinder, which can be passed an instance looking for async table function calls.
Example SQL:
SELECT * FROM t1, LATERAL TABLE(asyncTable(scalarFunction(f1)));
Original RelNode | Becomes |
|---|---|
|
|
Physical Rules
There will also need to be a StreamPhysicalAsyncCorrelateRule, which converts FlinkLogicalCorrelates to StreamPhysicalAsyncCorrelates. This will check for the existence of any async table function calls in the correlate call to determine whether to do that conversion.
Runtime Changes
Code Generation
The primary change is to extends DelegatingResultFuture, which currently handles just lookup joins, to handle wrapping the result in a Row if appropriate since we now handle implicit row wrapping. Beyond that, the call to FunctionCodeGenerator.generateFunction will ask for a AsyncFunction, similar to async calcs.
Operator
Since the call to the AsyncTableFunction is wrapped in a AsyncFunction taking input rows, we have the benefit of using the existing AsyncWaitOperator, which handles ordering, checkpointing, timeouts and other implementation details. Since only ordered results are handled in this scope, ORDERED will be the behavior.
Compatibility, Deprecation, and Migration Plan
This should work with all of the existing Lookup Join cases, so there should be no issues – It’s utilizing the same interface, but shouldn’t change existing behavior.
New uses for correlate query calls are newly supported, and require no backwards compatibility.
Test Plan
Unit tests on all of the components
Test cases that cover:
The split rule
ITCases with a bunch of correlate queries
Single non-Row return value
Hint with Row return value
operand calc to table function input
Async operand calc to table function input
SELECT projections in inner correlate query (not supported)
WHERE conditions in inner correlate query (not supported)
Rejected Alternatives
Look up joins:
+ Code already exists
- These are close, though support different queries that what are currently supported by
TableFunctions, namely correlate queries.- They don’t have full UDF support in the table API. E.g.
tEnv.createTemporarySystemFunction("func", new MyTableFunction()); tEng.executeSql(...);