Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

A lookup join is typically used to enrich a table with data that is queried from an external system(backed by a lookup source), but some data may not be updated in external systems in a timely manner, and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement it for now. Although flink supports various joins(regular joins, interval joins, temporal joins...) for streams, unfortunately the lookup source is often not available as a stream(e.g., Redis).

We want to support such retryable lookup join includes both async and sync lookups to solve delayed updates issue, as a pre-work for this solution, we proposed FLIP-232[1] which adds a general retry support for Async I/O. And we prefer to offer this retry capability via query hints, similar to new join hints proposed in FLINK-27625[2] & FLIP-204[3].

As discussed in the mailing thread, we're planning to introduce an unified hint to support both sync|async lookup and with|without retry, the support matrix will be:

lookup supportasyncretry
sync w/o retryNN
sync w/ retryNY
async w/o retryYN
async w/ retryYY


Non target:

  1. The lookup sources that can be connected as streams (can use other types of join)
  2. Async lookup with retry is not capable for fixed delayed processing for all input data (should use other lighter ways to solve, e.g., pending source consumption or use sync lookup with retry)
  3. Do not support retry on exception (let the sql connectors handle it)


Public Interfaces

A new query hint: 'LOOKUP' with different hint options ('async'='true|false', 'miss-retry'='true|false') to cover all related functionalities(include FLINK-27625 and discussion on connector option 'lookup.async' in FLIP-221[4]). Compared to multiple hints with different subsets of functionality, a single hint may be easier for users to understand and use, and specific parameters can be quickly found through documentation  

The available hint options:

option typeoption nameoptionalvalue typedefault valuedescription
table nametableNstringN/Athe table name of the lookup source


async

asyncYbooleanN/A

value can be 'true' or  'false' to suggest the planner choose the corresponding lookup function.

If the backend lookup source does not support the suggested lookup mode, it will take no effect.

output-modeYstringordered

value can be 'ordered' or 'allow_unordered'.

'allow_unordered' means if users allow unordered result, it will attempt to use AsyncDataStream.OutputMode.UNORDERED when it does not affect the correctness of the result, otherwise ORDERED will be still used. It is consistent with 

`ExecutionConfigOptions#TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE`.
capacityYinteger100the buffer capacity for the backend asyncWaitOperator of the lookup join operator.
timeoutYduration300stimeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover



retry
retry-predicateYstringN/Acan be 'lookup_miss' which will enable retry if lookup result is empty.
retry-strategyYstringN/Acan be 'fixed_delay' 
fixed-delayYdurationN/A

delay time for the 'fixed_delay' strategy

max-attemptsYintegerN/A

max attempt number of the 'fixed_delay' strategy


For these connectors which can have both capabilities of async and sync lookup, our advice for the connector developers are implementing both sync and async interfaces if both capabilities have suitable use cases, the planner will prefer the async one by default, and users can give different option value 'async'='true|false' via the LOOKUP query hint to suggest the planner,  otherwise choose one interface to implement.

Because query hint works in a best effort manner, so if users specifies a hint with invalid option, the query plan keeps unchanged, e.g., use LOOKUP('table'='customer', 'async'='true'), but backend lookup source only implemented the sync lookup function, then the async lookup hint takes no effect.

1. Sync and Async Mode Lookup

1.1 Sync Lookup

If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='false' to suggest the planner to use the sync lookup:

LOOKUP('table'='dim1', 'async'='false')

1.2 Async Lookup

If the connector has both capabilities of async and sync lookup, users can give the option value 'async'='true' to suggest the planner to use the async lookup:

LOOKUP('table'='dim1', 'async'='true')

And async lookup related parameters can also be configured via the hint option(this covers the join level configuration requirement proposed in FLINK-27625).  All the kv hint options except table name are optional (use job level configuration if not set):

LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s')

e.g., if the job level configuration is:

table.exec.async-lookup.output-mode: ORDERED
table.exec.async-lookup.buffer-capacity: 100
table.exec.async-lookup.timeout: 180s


then the following hints:

1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow_unordered')
2. LOOKUP('table'='dim1', 'async'='true', 'timeout'='300s')


are equivalent to:

1. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='allow_unordered', 'capacity'='100', 'timeout'='180s')
2. LOOKUP('table'='dim1', 'async'='true', 'output-mode'='ordered', 'capacity'='100', 'timeout'='300s')

2. Retry Support

The hint option 'retry-predicate'='lookup_miss'  can enable retry on both sync and async lookup. Retry related hint options:

'retry-strategy'='fixed_delay'
'fixed-delay'='10s'
'max-attempts'='3'

e.g., retry on async lookup

LOOKUP('table'='dim1', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3')

retry on sync lookup

LOOKUP('table'='dim1', 'async'='false', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3')

If the lookup source only has one capability, then the 'async' mode option can be omitted:

LOOKUP('table'='dim1', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3')

For the retry strategy, we plan to support a fixed_delay retry strategy first, and this can be extended in the future.

Use Case: Lookup Join Without Retry

flink example

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

Use Case: Lookup Join With Retry

-- retry triggered by empty result, using 10s fixed-delay strategy, max attempts 3. 
SELECT /*+ LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='3') */ 
	o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

In the above example, the backend lookup source of table 'Customers' is JDBC, so the retry takes effect on the sync lookup

Proposed Changes

The main change of the new hint is similar to FLIP-204, and after FLIP-204 is done, it will be simpler adding a join hint.

  1. Define Hint Strategy, add 'LOOKUP' and related hint parsing for kv options. (Original hints propagate via FlinkLogicalJoin)
  2. Hint validation and convert to table's internal RetryStrategy in exec lookup join node.

Since FLIP-204 has not done yet, we can not give a complete Poc here, but the changes are controllable.

1. Async Retry

convert the parsed internal RetryStrategy into runtime's AsyncRetryStrategy, and use the new api with retry support proposed in FLIP-232[1]

2. Sync Retry

add sync retry implementation for the internal LookupJoinRunner

3. Prefer Async Lookup Function For The Planner

If the connector has both capabilities of async and sync lookup, the planner will prefer the async one when no query hint is given. If the user give the option value explicitly, 'async'='false' will use the sync lookup and 'async'='true' will use the async lookup.

Compatibility, Deprecation, and Migration Plan

This feature is backwards compatible and transparently to all connectors. For existing connectors which support lookup, can easily enable async|sync retry via the new join hint.

Add a followup issue to discuss whether to remove the 'lookup.async' option in HBase connector.

Test Plan

Extend existing cases to cover retry scenarios.

Rejected Alternatives

Candidate 1:

Reject Reason:
it's impossible to use different retry strategies for two joins with a same table if retry strategy binding to table ddl, and the retry behavior is more relatively with the join operation not the table.

Set Retry Strategy Via Table DDL's With Options

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers',
  -- lookup_miss.retry-strategy
  'lookup_miss.retry-strategy'='fixed-delay', -- 'none' by default
  'lookup_miss.retry-strategy.fixed-delay.delay' = '10 s',
  'lookup_miss.retry-strategy.fixed-delay.max-attempts' = '3', 
);

-- join operation stays the same.
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;


Candidate 2:

Reject Reason:
It's hard for users to use, and queries can be overly complex when more retries are needed, even though the semantics are clean enough.

Introduce A New 'DERRER' SQL Operator

The sql standard does not have an operator that can express a 'delayed retry join' or the retry operation, and extending such a retryable join operator is hard to define a clear semantics.

Based on the idea that sql operators are orthogonal and combinable, a delayed processing table value function(Window TVF): 'DEFER' can be introduced, the input data will be delayed for a declared length of time before being sent out.

DEFER(table-identifier, time-descriptor, duration)
-- table-identifier: specify the table name
-- time-descriptor :  proctime or rowtime
-- duration: delayed duration


Use Case: Combine The DEFER and Join Operation To Simulate One Retry

if more times needed, the query should changed as well, maybe procedure(transact-SQL) is more suitable here.

-- ①  create views for easy reading
CREATE VIEW v_first_join AS
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
-- Note: the original inner join change to left join
LEFT JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

-- view2 which join result is empty
CREATE VIEW v_first_join_empty AS
SELECT order_id, total, country FROM v_first_join WHERE zip IS NULL;

-- ② Add delayed processing and join operation (similar to retry) for data which join results is empty, and UNION ALL for non-empty results, lastly output to downstream
INSERT INTO downstream_table
SELECT * FROM v_first_join WHERE zip IS NOT NULL;

UNION ALL

SELECT o.order_id, o.total, c.country, c.zip
FROM TABLE(DEFER(v_first_join_empty, proc_time, 10 s)) AS o
-- Note: back to inner join here
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;


References:

1. FLIP-232: Add Retry Support For Async I/O In DataStream API

2. FLINK-27625:  Add query hint for async lookup join

3. FLIP-204: Introduce Hash Lookup Join

4. https://lists.apache.org/thread/1vokqdnnt01yycl7y1p74g556cc8yvtq