DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5
Vote thread: https://lists.apache.org/thread/837r63gdwzoqryvp3gbf67941g706s5d
Jira: FLINK-33397 - Getting issue details... STATUS
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-292 [1] expands the functionality of CompiledPlan to enable the configuration of operator-level state TTL by modifying the JSON file. During the discussion, it was mentioned that the SQL hint solution could also address some specific simple scenarios, which are very user-friendly, convenient, and unambiguous to use [2]. While in this FLIP, we propose to support this feature.
Specifically, we aim to support using SQL hints to configure state TTL for stream regular join [3] and group aggregate [4], the most typical use cases with unambiguous semantics and hint propagation. We have excluded other use cases as the hint propagation mechanism is more complex, and we might want to tackle them after the current design reaches a consensus.
Public Interfaces
The proposed changes do not introduce interfaces that need to be annotated with Public/PublicEvolving.
Proposed Changes
To enable configuring different state TTLs for stream join or group aggregate, users need to specify a new hint (`STATE_TTL`) in the select clause, similar to the current SQL hints[5].
query: select /*+ STATE_TTL(kv_option[,kv_option]) */ ... kv_option: key_string_literal = value_string_literal key_string_literal: table_name | view_name | table_alias | query_block_alias value_string_literal: ttl_duration
Stream Regular Join Example
-- hint with table name as key
SELECT /*+ STATE_TTL('orders'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;
-- hint with table alias as key
SELECT /*+ STATE_TTL('o'= '12h', 'c' = '3d') */ * FROM orders o LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey;
-- hint with temporary view name as key
CREATE TEMPORARY VIEW left_input AS ...;
CREATE TEMPORARY VIEW right_input AS ...;
SELECT /*+ STATE_TTL('left_input'= '36000s', 'right_input' = '15h') */ * FROM left_input JOIN right_input
ON left_input.join_key = right_input.join_key;
Group Aggregate Example
-- hint with table name as key
SELECT /*+ STATE_TTL('orders' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders
GROUP BY o_orderkey;
-- hint with table alias as key
SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders AS o
GROUP BY o_orderkey;
-- hint with query block alias as key
SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM (
SELECT o_orderkey, o_totalprice
FROM orders
WHERE o_shippriority = 0
) tmp
GROUP BY o_orderkey;
Stream Regular Join mix Group Aggregate Example
-- the group aggregate query block as the input of the regular join
SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ *
FROM (
SELECT /*+ STATE_TTL('orders' = '30d') */ o_custkey, COUNT(o_orderkey) AS order_num
FROM orders
GROUP BY o_custkey
) agg
LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;
-- the regular join query block as the input of the group aggregate
SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue
FROM (
SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ *
FROM orders o LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey
) tmp;
Exception Handling
When users incorrectly use STATE_TTL hints, there are two possible scenarios:
Scenario 1: The user specifies STATE_TTL which can be applied to the current query block, but with a wrong hint key. For example, they set a non-existent table name/table alias or a query block alias. In this case, an exception should be explicitly thrown. E.g. the following query will throw the org.apache.flink.table.api.ValidationException with a message "The options of following hints cannot match the name of input tables or views:\n`foo` in `state_ttl`". And this check is already done by JoinHintResolver (See JoinHintResolver.java#L193).
-- hint with a non-exist table name as key
SELECT /*+ STATE_TTL('foo'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;
We can do a similar check for the aggregate node when there is an invalid hint key (this is newly introduced since there has been no aggregate hint check before).
Scenario 2: The user includes the STATE_TTL hint in a query block that does not support it. In this case, the hint will be ignored without throwing an exception. It's intended to do so to remain consistent with the current hint behavior.
E.g. SHUFFLE_MERGE is a batch join hint that only supports join with equivalence join condition (See hints/#shuffle_merge), but we can still get a valid plan and execution result under streaming mode, and with the non-equi condition, the hint is quietly ignored.
-- explain the query under streaming mode
EXPLAIN SELECT /*+ SHUFFLE_MERGE('o') */* from
orders o LEFT JOIN
lineitem l
ON o.o_totalprice > l.l_extendedprice;
== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[>(o_totalprice, l_extendedprice)], select=[o_orderkey, o_custkey, o_status, o_totalprice, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[SHUFFLE_MERGE options:[LEFT]]]])
:- Exchange(distribution=[single])
: +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status, o_totalprice])
+- Exchange(distribution=[single])
+- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
The reason is explained in the FLIP-229: Introduces Join Hint for Flink SQL Batch Job, quoted as follows.
> 3. Check when optimizing
> In the optimization stage of the optimizer, it is judged that this Join Hint can be applied. If it cannot be applied, the behavior is to print a warning log instead of throwing an exception directly.
Hint Strategy
HintStrategyTable.builder()
.hintStrategy("STATE_TTL",
HintStrategy.builder(
HintPredicates.or(
HintPredicates.JOIN, HintPredicates.AGGREGATE))
.optionChecker(STATE_TTL_OPTION_CHECKER)
.build())
.build();
The STATE_TTL_OPTION_CHECKER will ensure the hint options are non-empty and the value is a valid duration.
Hint Propagation in Optimizer
Currently, SqlToRelConverter will attach an alias hint at the root of the query block if the join hint is detected (see SqlToRelConverter.java#L2289), and then ClearJoinHintWithInvalidPropagationShuttle will clear the invalid hints that propagated from the outer query block by checking the hint strategy and comparing the relative inherit path to the query block's alias hint. This ensures that hints from other nodes will not be attached to the current join node.
The STATE_TTL hint will apply a similar approach, which means the hint only affects the current query block and does not affect the state TTL in the subquery, which aligns with the current hint behavior. Likewise, `ClearJoinHintWithInvalidPropagationShuttle` will override the visit method to remove the propagated hints for FlinkLogicalAggregate.
@Override
public RelNode visit(LogicalAggregate aggregate) {
List<RelHint> hints = ((Hintable) aggregate).getHints();
Optional<RelHint> firstAliasHint =
hints.stream()
.filter(hint -> FlinkHints.HINT_ALIAS.equals(hint.hintName))
.findFirst();
if (!firstAliasHint.isPresent()) {
return super.visit(aggregate);
}
// compare the inheritPath of alias hint and other hints to find propogated hints from the outer query block
List<RelHint> joinHintsFromOuterQueryBlock = ...;
if (joinHintsFromOuterQueryBlock.isEmpty()) {
return super.visit(aggregate);
}
RelNode newAggregate = aggregate;
ClearOuterJoinHintShuttle clearOuterJoinHintShuttle;
for (RelHint outerJoinHint : joinHintsFromOuterQueryBlock) {
clearOuterJoinHintShuttle = new ClearOuterJoinHintShuttle(outerJoinHint);
newAggregate = newAggregate.accept(clearOuterJoinHintShuttle);
}
return super.visit(newAggregate);
}
Expected Physical Plan
We elaborate on the details as follows.
1. Cascade Joins
For cascade joins like SELECT /*+ STATE_TTL('A' = '...', 'B' = '...', 'C' = '...')*/ * FROM A JOIN B ON ... JOIN C ON ... , the specified state TTLs will be interpreted as the left and right state TTL for the first join operator and the right state TTL for the second join operator (from a bottom-up order). The left state TTL for the second join operator will be retrieved from the configuration table.exec.state.ttl.
EXPALIN SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */* FROM
orders o LEFT OUTER JOIN
lineitem l
ON o.o_orderkey = l.l_orderkey
LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey;
== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, o_status, l_linenumber, l_orderkey, l_partkey, l_extendedprice, c_custkey, c_address], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]])
:- Exchange(distribution=[hash[o_custkey]])
: +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, o_status, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]])
: :- Exchange(distribution=[hash[o_orderkey]])
: : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status])
: +- Exchange(distribution=[hash[l_orderkey]])
: +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
+- Exchange(distribution=[hash[c_custkey]])
+- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])
If users need to set a specific value, they can split the cascade join clause into query blocks like
CREATE TEMPORARY VIEW V AS SELECT /*+ STATE_TTL('A' = '${ttl_A}', 'B' = '${ttl_B}')*/ * FROM A JOIN B ON...;
SELECT /*+ STATE_TTL('V' = '${ttl_V}', 'C' = '${ttl_C}')*/ * FROM V JOIN C ON ...;
2. Group Aggregate
EXPLAIN SELECT /*+ STATE_TTL('lineitem' = '1d')*/ l_partkey, SUM(l_extendedprice) AS revenue
FROM lineitem
GROUP BY l_partkey;
== Optimized Physical Plan ==
GroupAggregate(groupBy=[l_partkey], select=[l_partkey, SUM(l_extendedprice) AS revenue], hints=[[[state_ttl options:{lineitem=86400000}]]])
+- Exchange(distribution=[hash[l_partkey]])
+- Calc(select=[l_partkey, l_extendedprice])
+- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
-- left input subquery contains group aggregate
EXPLAIN
SELECT /*+ STATE_TTL('o' = '3d', 'agg' = '1d', 'c' = '10d') */* FROM
orders o
LEFT OUTER JOIN
(
SELECT /*+ STATE_TTL('lineitem' = '1h')*/ l_orderkey, l_partkey, SUM(l_extendedprice) AS revenue
FROM lineitem
GROUP BY l_orderkey, l_partkey
) agg
ON o.o_orderkey = agg.l_orderkey
LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey;
== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, o_status, l_orderkey, l_partkey, revenue, c_custkey, c_address], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]])
:- Exchange(distribution=[hash[o_custkey]])
: +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, o_status, l_orderkey, l_partkey, revenue], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]])
: :- Exchange(distribution=[hash[o_orderkey]])
: : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status])
: +- Exchange(distribution=[hash[l_orderkey]])
: +- GroupAggregate(groupBy=[l_orderkey, l_partkey], select=[l_orderkey, l_partkey, SUM(l_extendedprice) AS revenue], hints=[[[state_ttl options:{lineitem=3600000}]]])
: +- Exchange(distribution=[hash[l_orderkey, l_partkey]])
: +- Calc(select=[l_orderkey, l_partkey, l_extendedprice])
: +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
+- Exchange(distribution=[hash[c_custkey]])
+- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])
-- subquery contains regular join operation
EXPLAIN
SELECT /*+ STATE_TTL('tmp' = '48h')*/ o_orderkey, c_custkey, SUM(l_extendedprice) AS revenue
FROM (
SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */*
FROM orders o LEFT OUTER JOIN lineitem l
ON o.o_orderkey = l.l_orderkey
LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey
) tmp
GROUP BY o_orderkey, c_custkey;
== Optimized Physical Plan ==
GroupAggregate(groupBy=[o_orderkey, c_custkey], select=[o_orderkey, c_custkey, SUM_RETRACT(l_extendedprice) AS revenue], hints=[[[state_ttl options:{tmp=172800000}]]])
+- Exchange(distribution=[hash[o_orderkey, c_custkey]])
+- Calc(select=[o_orderkey, c_custkey, l_extendedprice])
+- Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, l_extendedprice, c_custkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]])
:- Exchange(distribution=[hash[o_custkey]])
: +- Calc(select=[o_orderkey, o_custkey, l_extendedprice])
: +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, l_orderkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]])
: :- Exchange(distribution=[hash[o_orderkey]])
: : +- Calc(select=[o_orderkey, o_custkey])
: : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status])
: +- Exchange(distribution=[hash[l_orderkey]])
: +- Calc(select=[l_orderkey, l_extendedprice])
: +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
+- Exchange(distribution=[hash[c_custkey]])
+- Calc(select=[c_custkey])
+- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])
Pass Hint Value to the Operator's StateMetadata
Take stream regular join as an example. The JoinHintResolver will validate and resolve the state TTL hints, and normalize the TTL duration value with ms as the time unit, which aligns with the default time unit of table.exec.state.ttl .
The StreamPhysicalJoinRule will pass the validated RelHint as the argument to create StreamPhysicalJoin. StreamPhysicalJoin#translateToExecNode then converts the JOIN_STATE_TTL hint to a map, which maps the input ordinal to the specified TTL value.
override def translateToExecNode(): ExecNode[_] = {
+ val ttlFromHint = new util.HashMap[JInt, JLong]
+ getHints
+ .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName))
+ .forEach {
+ hint =>
+ hint.kvOptions.forEach(
+ (input, ttl) =>
+ ttlFromHint.put(if (input == JoinStrategy.LEFT_INPUT) 0 else 1, ttl.toLong))
+ }
new StreamExecJoin(
unwrapTableConfig(this),
joinSpec,
getUpsertKeys(left, joinSpec.getLeftKeys),
getUpsertKeys(right, joinSpec.getRightKeys),
InputProperty.DEFAULT,
InputProperty.DEFAULT,
+ ttlFromHint,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}
StreamExecJoin will prioritize taking values from the hint as the TTL for the stream join operator.
public StreamExecJoin(
ReadableConfig tableConfig,
JoinSpec joinSpec,
List<int[]> leftUpsertKeys,
List<int[]> rightUpsertKeys,
InputProperty leftInputProperty,
InputProperty rightInputProperty,
+ Map<Integer, Long> stateTtlFromHint, // key is the input ordinal, value is the TTL retrieved from hint
RowType outputType,
String description) {
this(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(StreamExecJoin.class),
ExecNodeContext.newPersistedConfig(StreamExecJoin.class, tableConfig),
joinSpec,
leftUpsertKeys,
rightUpsertKeys,
+ StateMetadata.getMultiInputOperatorDefaultMeta(
+ stateTtlFromHint, tableConfig, LEFT_STATE_NAME, RIGHT_STATE_NAME),
Lists.newArrayList(leftInputProperty, rightInputProperty),
outputType,
description);
}
public static List<StateMetadata> getMultiInputOperatorDefaultMeta(
+ Map<Integer, Long> stateTtlFromHint,
ReadableConfig tableConfig,
String... stateNameList) {
Duration ttlFromTableConf = tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION);
List<StateMetadata> stateMetadataList = new ArrayList<>(stateNameList.length);
for (int i = 0; i < stateNameList.length; i++) {
+ Duration stateTtl =
+ stateTtlFromHint.containsKey(i)
+ ? Duration.ofMillis(stateTtlFromHint.get(i))
+ : ttlFromTableConf;
stateMetadataList.add(new StateMetadata(i, stateTtl, stateNameList[i]));
}
return stateMetadataList;
}
Affected StreamPhysicalRels
- StreamPhysicalJoin
- StreamPhysicalGroupAggregate
- StreamPhysicalGlobalGroupAggregate (to support two-phase optimization)
- StreamPhysicalIncrementalGroupAggregate (to support partial-global-agg and final-local-agg combine optimization)
Parameter Granularity and Priority
| Different ways to change state TTL for Table API/ SQL Pipeline | Granularity | Priority |
|---|---|---|
SET 'table.exec.state.ttl' = '...' | The value applies to the whole pipeline, all stateful operators will use the value as state TTL by default. | This is the default state TTL configuration and can be overridden by enabling the STATE_TTL hint or modifying the value of the serialized CompiledPlan. |
SELECT /*+ STATE_TTL(...) */ ... | The value only applies to the specific operator (currently is join and group aggregate), which is translated from the hinted query block containing the specific clause. | The hint precedes the default table.exec.state.ttl. This value will be serialized to the CompiledPlan during the plan translation phase. |
Modify serialized JSON content of CompiledPlan | The TTL for each stateful operator is explicitly serialized as an entry of the JSON. Modifying the JSON file can change the TTL for any stateful operator. | The state metadata value derives from either <1> table.exec.state.ttl or <2> STATE_TTL hint. However, if users first enable the hint, compile the query to a JSON file, manually change the TTL for the join operator, and then submit the job via the plan, the deserialized compiled plan will accept the last modified value as the final TTL parameter. |
Compatibility, Deprecation, and Migration Plan
This feature will be introduced as an opt-in enhancement and will not affect existing SQL queries or applications. The hint will be backward-compatible, allowing users to adopt the feature in their queries gradually.
Test Plan
Comprehensive unit tests will be added to ensure the correctness of parsing and the optimized plan of the STATE_TTL hint. Integration tests will also be conducted to validate the functionality of stream join and group aggregate operators with state TTL configured using SQL hints.
Rejected Alternatives
SELECT /*+ STATE_TTL('table' = 'orders', 'table.exec.state.ttl' = '1d'), STATE_TTL('table' = 'customers', 'table.exec.state.ttl' = '20d') */ *
FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey
The KV Options are rejected due to verboseness.
References
[1] FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration
[2] https://lists.apache.org/thread/f4hz13g9gykdcwdvf0ws5xkgkytck2w1
[4] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
[5] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/