Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5
Vote thread: https://lists.apache.org/thread/837r63gdwzoqryvp3gbf67941g706s5d
Jira: - FLINK-33397Getting issue details... STATUS
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
FLIP-292 [1] expands the functionality of CompiledPlan to enable the configuration of operator-level state TTL by modifying the JSON file. During the discussion, it was mentioned that the SQL hint solution could also address some specific simple scenarios, which are very user-friendly, convenient, and unambiguous to use [2]. While in this FLIP, we propose to support this feature.
Specifically, we aim to support using SQL hints to configure state TTL for stream regular join [3] and group aggregate [4], the most typical use cases with unambiguous semantics and hint propagation. We have excluded other use cases as the hint propagation mechanism is more complex, and we might want to tackle them after the current design reaches a consensus.
Public Interfaces
The proposed changes do not introduce interfaces that need to be annotated with Public/PublicEvolving.
Proposed Changes
To enable configuring different state TTLs for stream join or group aggregate, users need to specify a new hint (`STATE_TTL`) in the select clause, similar to the current SQL hints[5].
query: select /*+ STATE_TTL(kv_option[,kv_option]) */ ... kv_option: key_string_literal = value_string_literal key_string_literal: table_name | view_name | table_alias | query_block_alias value_string_literal: ttl_duration
Stream Regular Join Example
-- hint with table name as key SELECT /*+ STATE_TTL('orders'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers ON orders.o_custkey = customers.c_custkey; -- hint with table alias as key SELECT /*+ STATE_TTL('o'= '12h', 'c' = '3d') */ * FROM orders o LEFT OUTER JOIN customers c ON o.o_custkey = c.c_custkey; -- hint with temporary view name as key CREATE TEMPORARY VIEW left_input AS ...; CREATE TEMPORARY VIEW right_input AS ...; SELECT /*+ STATE_TTL('left_input'= '36000s', 'right_input' = '15h') */ * FROM left_input JOIN right_input ON left_input.join_key = right_input.join_key;
Group Aggregate Example
-- hint with table name as key SELECT /*+ STATE_TTL('orders' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue FROM orders GROUP BY o_orderkey; -- hint with table alias as key SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue FROM orders AS o GROUP BY o_orderkey; -- hint with query block alias as key SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue FROM ( SELECT o_orderkey, o_totalprice FROM orders WHERE o_shippriority = 0 ) tmp GROUP BY o_orderkey;
Stream Regular Join mix Group Aggregate Example
-- the group aggregate query block as the input of the regular join SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ * FROM ( SELECT /*+ STATE_TTL('orders' = '30d') */ o_custkey, COUNT(o_orderkey) AS order_num FROM orders GROUP BY o_custkey ) agg LEFT OUTER JOIN customers ON orders.o_custkey = customers.c_custkey; -- the regular join query block as the input of the group aggregate SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue FROM ( SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ * FROM orders o LEFT OUTER JOIN customers c ON o.o_custkey = c.c_custkey ) tmp;
Exception Handling
When users incorrectly use STATE_TTL hints, there are two possible scenarios:
Scenario 1: The user specifies STATE_TTL which can be applied to the current query block, but with a wrong hint key. For example, they set a non-existent table name/table alias or a query block alias. In this case, an exception should be explicitly thrown. E.g. the following query will throw the org.apache.flink.table.api.ValidationException with a message "The options of following hints cannot match the name of input tables or views:\n`foo` in `state_ttl`". And this check is already done by JoinHintResolver (See JoinHintResolver.java#L193).
-- hint with a non-exist table name as key SELECT /*+ STATE_TTL('foo'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers ON orders.o_custkey = customers.c_custkey;
We can do a similar check for the aggregate node when there is an invalid hint key (this is newly introduced since there has been no aggregate hint check before).
Scenario 2: The user includes the STATE_TTL hint in a query block that does not support it. In this case, the hint will be ignored without throwing an exception. It's intended to do so to remain consistent with the current hint behavior.
E.g. SHUFFLE_MERGE is a batch join hint that only supports join with equivalence join condition (See hints/#shuffle_merge), but we can still get a valid plan and execution result under streaming mode, and with the non-equi condition, the hint is quietly ignored.
-- explain the query under streaming mode EXPLAIN SELECT /*+ SHUFFLE_MERGE('o') */* from orders o LEFT JOIN lineitem l ON o.o_totalprice > l.l_extendedprice; == Optimized Physical Plan == Join(joinType=[LeftOuterJoin], where=[>(o_totalprice, l_extendedprice)], select=[o_orderkey, o_custkey, o_status, o_totalprice, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[SHUFFLE_MERGE options:[LEFT]]]]) :- Exchange(distribution=[single]) : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status, o_totalprice]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
The reason is explained in the FLIP-229: Introduces Join Hint for Flink SQL Batch Job, quoted as follows.
> 3. Check when optimizing
> In the optimization stage of the optimizer, it is judged that this Join Hint can be applied. If it cannot be applied, the behavior is to print a warning log instead of throwing an exception directly.
Hint Strategy
HintStrategyTable.builder() .hintStrategy("STATE_TTL", HintStrategy.builder( HintPredicates.or( HintPredicates.JOIN, HintPredicates.AGGREGATE)) .optionChecker(STATE_TTL_OPTION_CHECKER) .build()) .build();
The STATE_TTL_OPTION_CHECKER
will ensure the hint options are non-empty and the value is a valid duration.
Hint Propagation in Optimizer
Currently, SqlToRelConverter
will attach an alias hint at the root of the query block if the join hint is detected (see SqlToRelConverter.java#L2289), and then ClearJoinHintWithInvalidPropagationShuttle
will clear the invalid hints that propagated from the outer query block by checking the hint strategy and comparing the relative inherit path to the query block's alias hint. This ensures that hints from other nodes will not be attached to the current join node.
The STATE_TTL
hint will apply a similar approach, which means the hint only affects the current query block and does not affect the state TTL in the subquery, which aligns with the current hint behavior. Likewise, `ClearJoinHintWithInvalidPropagationShuttle` will override the visit
method to remove the propagated hints for FlinkLogicalAggregate
.
@Override public RelNode visit(LogicalAggregate aggregate) { List<RelHint> hints = ((Hintable) aggregate).getHints(); Optional<RelHint> firstAliasHint = hints.stream() .filter(hint -> FlinkHints.HINT_ALIAS.equals(hint.hintName)) .findFirst(); if (!firstAliasHint.isPresent()) { return super.visit(aggregate); } // compare the inheritPath of alias hint and other hints to find propogated hints from the outer query block List<RelHint> joinHintsFromOuterQueryBlock = ...; if (joinHintsFromOuterQueryBlock.isEmpty()) { return super.visit(aggregate); } RelNode newAggregate = aggregate; ClearOuterJoinHintShuttle clearOuterJoinHintShuttle; for (RelHint outerJoinHint : joinHintsFromOuterQueryBlock) { clearOuterJoinHintShuttle = new ClearOuterJoinHintShuttle(outerJoinHint); newAggregate = newAggregate.accept(clearOuterJoinHintShuttle); } return super.visit(newAggregate); }
Expected Physical Plan
We elaborate on the details as follows.
1. Cascade Joins
For cascade joins like SELECT /*+ STATE_TTL('A' = '...', 'B' = '...', 'C' = '...')*/ * FROM A JOIN B ON ... JOIN C ON ...
, the specified state TTLs will be interpreted as the left and right state TTL for the first join operator and the right state TTL for the second join operator (from a bottom-up order). The left state TTL for the second join operator will be retrieved from the configuration table.exec.state.ttl
.
EXPALIN SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */* FROM orders o LEFT OUTER JOIN lineitem l ON o.o_orderkey = l.l_orderkey LEFT OUTER JOIN customers c ON o.o_custkey = c.c_custkey; == Optimized Physical Plan == Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, o_status, l_linenumber, l_orderkey, l_partkey, l_extendedprice, c_custkey, c_address], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]]) :- Exchange(distribution=[hash[o_custkey]]) : +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, o_status, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]]) : :- Exchange(distribution=[hash[o_orderkey]]) : : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status]) : +- Exchange(distribution=[hash[l_orderkey]]) : +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice]) +- Exchange(distribution=[hash[c_custkey]]) +- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])
If users need to set a specific value, they can split the cascade join clause into query blocks like
CREATE TEMPORARY VIEW V AS SELECT /*+ STATE_TTL('A' = '${ttl_A}', 'B' = '${ttl_B}')*/ * FROM A JOIN B ON...; SELECT /*+ STATE_TTL('V' = '${ttl_V}', 'C' = '${ttl_C}')*/ * FROM V JOIN C ON ...;
2. Group Aggregate
EXPLAIN SELECT /*+ STATE_TTL('lineitem' = '1d')*/ l_partkey, SUM(l_extendedprice) AS revenue FROM lineitem GROUP BY l_partkey; == Optimized Physical Plan == GroupAggregate(groupBy=[l_partkey], select=[l_partkey, SUM(l_extendedprice) AS revenue], hints=[[[state_ttl options:{lineitem=86400000}]]]) +- Exchange(distribution=[hash[l_partkey]]) +- Calc(select=[l_partkey, l_extendedprice]) +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice]) -- left input subquery contains group aggregate EXPLAIN SELECT /*+ STATE_TTL('o' = '3d', 'agg' = '1d', 'c' = '10d') */* FROM orders o LEFT OUTER JOIN ( SELECT /*+ STATE_TTL('lineitem' = '1h')*/ l_orderkey, l_partkey, SUM(l_extendedprice) AS revenue FROM lineitem GROUP BY l_orderkey, l_partkey ) agg ON o.o_orderkey = agg.l_orderkey LEFT OUTER JOIN customers c ON o.o_custkey = c.c_custkey; == Optimized Physical Plan == Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, o_status, l_orderkey, l_partkey, revenue, c_custkey, c_address], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]]) :- Exchange(distribution=[hash[o_custkey]]) : +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, o_status, l_orderkey, l_partkey, revenue], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]]) : :- Exchange(distribution=[hash[o_orderkey]]) : : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status]) : +- Exchange(distribution=[hash[l_orderkey]]) : +- GroupAggregate(groupBy=[l_orderkey, l_partkey], select=[l_orderkey, l_partkey, SUM(l_extendedprice) AS revenue], hints=[[[state_ttl options:{lineitem=3600000}]]]) : +- Exchange(distribution=[hash[l_orderkey, l_partkey]]) : +- Calc(select=[l_orderkey, l_partkey, l_extendedprice]) : +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice]) +- Exchange(distribution=[hash[c_custkey]]) +- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address]) -- subquery contains regular join operation EXPLAIN SELECT /*+ STATE_TTL('tmp' = '48h')*/ o_orderkey, c_custkey, SUM(l_extendedprice) AS revenue FROM ( SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */* FROM orders o LEFT OUTER JOIN lineitem l ON o.o_orderkey = l.l_orderkey LEFT OUTER JOIN customers c ON o.o_custkey = c.c_custkey ) tmp GROUP BY o_orderkey, c_custkey; == Optimized Physical Plan == GroupAggregate(groupBy=[o_orderkey, c_custkey], select=[o_orderkey, c_custkey, SUM_RETRACT(l_extendedprice) AS revenue], hints=[[[state_ttl options:{tmp=172800000}]]]) +- Exchange(distribution=[hash[o_orderkey, c_custkey]]) +- Calc(select=[o_orderkey, c_custkey, l_extendedprice]) +- Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, l_extendedprice, c_custkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]]) :- Exchange(distribution=[hash[o_custkey]]) : +- Calc(select=[o_orderkey, o_custkey, l_extendedprice]) : +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, l_orderkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]]) : :- Exchange(distribution=[hash[o_orderkey]]) : : +- Calc(select=[o_orderkey, o_custkey]) : : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status]) : +- Exchange(distribution=[hash[l_orderkey]]) : +- Calc(select=[l_orderkey, l_extendedprice]) : +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice]) +- Exchange(distribution=[hash[c_custkey]]) +- Calc(select=[c_custkey]) +- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])
Pass Hint Value to the Operator's StateMetadata
Take stream regular join as an example. The JoinHintResolver
will validate and resolve the state TTL hints, and normalize the TTL duration value with ms as the time unit, which aligns with the default time unit of table.exec.state.ttl
.
The StreamPhysicalJoinRule
will pass the validated RelHint as the argument to create StreamPhysicalJoin
. StreamPhysicalJoin#translateToExecNode
then converts the JOIN_STATE_TTL hint to a map, which maps the input ordinal to the specified TTL value.
override def translateToExecNode(): ExecNode[_] = { + val ttlFromHint = new util.HashMap[JInt, JLong] + getHints + .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName)) + .forEach { + hint => + hint.kvOptions.forEach( + (input, ttl) => + ttlFromHint.put(if (input == JoinStrategy.LEFT_INPUT) 0 else 1, ttl.toLong)) + } new StreamExecJoin( unwrapTableConfig(this), joinSpec, getUpsertKeys(left, joinSpec.getLeftKeys), getUpsertKeys(right, joinSpec.getRightKeys), InputProperty.DEFAULT, InputProperty.DEFAULT, + ttlFromHint, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) }
StreamExecJoin
will prioritize taking values from the hint as the TTL for the stream join operator.
public StreamExecJoin( ReadableConfig tableConfig, JoinSpec joinSpec, List<int[]> leftUpsertKeys, List<int[]> rightUpsertKeys, InputProperty leftInputProperty, InputProperty rightInputProperty, + Map<Integer, Long> stateTtlFromHint, // key is the input ordinal, value is the TTL retrieved from hint RowType outputType, String description) { this( ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecJoin.class), ExecNodeContext.newPersistedConfig(StreamExecJoin.class, tableConfig), joinSpec, leftUpsertKeys, rightUpsertKeys, + StateMetadata.getMultiInputOperatorDefaultMeta( + stateTtlFromHint, tableConfig, LEFT_STATE_NAME, RIGHT_STATE_NAME), Lists.newArrayList(leftInputProperty, rightInputProperty), outputType, description); }
public static List<StateMetadata> getMultiInputOperatorDefaultMeta( + Map<Integer, Long> stateTtlFromHint, ReadableConfig tableConfig, String... stateNameList) { Duration ttlFromTableConf = tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION); List<StateMetadata> stateMetadataList = new ArrayList<>(stateNameList.length); for (int i = 0; i < stateNameList.length; i++) { + Duration stateTtl = + stateTtlFromHint.containsKey(i) + ? Duration.ofMillis(stateTtlFromHint.get(i)) + : ttlFromTableConf; stateMetadataList.add(new StateMetadata(i, stateTtl, stateNameList[i])); } return stateMetadataList; }
Affected StreamPhysicalRels
- StreamPhysicalJoin
- StreamPhysicalGroupAggregate
- StreamPhysicalGlobalGroupAggregate (to support two-phase optimization)
- StreamPhysicalIncrementalGroupAggregate (to support partial-global-agg and final-local-agg combine optimization)
Parameter Granularity and Priority
Different ways to change state TTL for Table API/ SQL Pipeline | Granularity | Priority |
---|---|---|
SET 'table.exec.state.ttl' = '...' | The value applies to the whole pipeline, all stateful operators will use the value as state TTL by default. | This is the default state TTL configuration and can be overridden by enabling the STATE_TTL hint or modifying the value of the serialized CompiledPlan. |
SELECT /*+ STATE_TTL(...) */ ... | The value only applies to the specific operator (currently is join and group aggregate), which is translated from the hinted query block containing the specific clause. | The hint precedes the default table.exec.state.ttl. This value will be serialized to the CompiledPlan during the plan translation phase. |
Modify serialized JSON content of CompiledPlan | The TTL for each stateful operator is explicitly serialized as an entry of the JSON. Modifying the JSON file can change the TTL for any stateful operator. | The state metadata value derives from either <1> table.exec.state.ttl or <2> STATE_TTL hint. However, if users first enable the hint, compile the query to a JSON file, manually change the TTL for the join operator, and then submit the job via the plan, the deserialized compiled plan will accept the last modified value as the final TTL parameter. |
Compatibility, Deprecation, and Migration Plan
This feature will be introduced as an opt-in enhancement and will not affect existing SQL queries or applications. The hint will be backward-compatible, allowing users to adopt the feature in their queries gradually.
Test Plan
Comprehensive unit tests will be added to ensure the correctness of parsing and the optimized plan of the STATE_TTL hint. Integration tests will also be conducted to validate the functionality of stream join and group aggregate operators with state TTL configured using SQL hints.
Rejected Alternatives
SELECT /*+ STATE_TTL('table' = 'orders', 'table.exec.state.ttl' = '1d'), STATE_TTL('table' = 'customers', 'table.exec.state.ttl' = '20d') */ * FROM orders LEFT OUTER JOIN customers ON orders.o_custkey = customers.c_custkey
The KV Options are rejected due to verboseness.
References
[1] FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration
[2] https://lists.apache.org/thread/f4hz13g9gykdcwdvf0ws5xkgkytck2w1
[4] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
[5] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/