Status
Current state: ["Accepted"]
Discussion thread: https://lists.apache.org/thread.html/r94af5d3d97e76e7dd9df68cbffff0becc7ba74d15591a8fab84c72fa%40%3Cdev.flink.apache.org%3E
JIRA:
-
FLINK-17101Getting issue details...
STATUS
Released: 1.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Now a Flink SQL front user can define tables within a catalog by specifying DDLs, for example, a Kafka topic can be defined as a table with statement like below:
create table kafka_table ( id bigint, age int, name STRING ) WITH ( 'connector.type' = 'kafka', 'update-mode' = 'append' 'connector.topic' = 'topic.employees', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'csv', 'format.ignore-parse-errors' = 'false' )
Then we would have a table defined in the catalog, and we can reference it in our query. The table options were persisted to the Catalog and if we want to modify that, we should use another DDL like "ALTER TABLE ...". But there are some cases that user want to modify the table options dynamically just in the query:
- My Job crashed, and i need to drop the history records to consume from the latest, so need to modify the "connector.startup-mode" to "latest-offset"
- I want to ignore the parse errors, so need to set up the "format.ignore-parse-errors" to "true"
Another case is with a JDBC sink defined by DDL in the SQL-CLI:
create table jdbc_table ( a int, b varchar ) with ( 'connector.type' = 'jdbc', 'connector.write.flush.interval' = '5000' )
In order to observe the data from database side, I may want my JDBC sink flush data more eagerly and set up the "connector.write.flush.interval" to "2000"
These parameters setting is very common and ad-hoc, setting them flexibly would promote the user experience with FLINK SQL especially for now we have so many different kind of connectors and so many supported table options.
Flink Dynamic Table Options Proposal
In order to pass around the table options dynamically and flexibly, we use the "table hints" syntax for these options: right after each referenced table in the query, we can specify an options block to indicate which options we want to override.
Note: we use the hints style syntax but different with most of the normal table hints, the dynamic options may affect the query result set, so strictly to say, it is not a "hint".
Hint Syntax
We choose the Oracle hints style syntax because its hints are embedded in comments, this does not break the sql compatibility and has very good readability. We avoid introducing too many non-standard SQL keywords.
To supply an Oracle style hints grammar for both query hints and table hints, we implement the hints grammar that can adapter most of the hint cases:
query : select ... from table_name1 /*+ hint_content */ join table_name2 /*+ hint_content */ ... hint_content : hint_item[, hint_item]* hint_item : | hint_name(k1=v1 [ , k2=v2 ]*) k : string_literal v : string_literal hint_opt : simple_identifier | numeric_literal | string_literal
Example:
SELECT * FROM EMP /*+ OPTIONS('k1'='v1', 'k2'='v2') */ JOIN DEPT /*+ OPTIONS('a.b.c'='v3', 'd.e.f'='v4') */ ON EMP.deptno = DEPT.deptno
Public Interfaces
Regenerate New CatalogTable with Given Options
CatalogTable#copy(Map<String, String>) supports creating a new copy of original CatalogTable with given options, we can use this interface to override with dynamic table options.
public interface CatalogTable extends CatalogBaseTable { /** * Returns a copy of this {@code CatalogTable} with given table options {@code options}. * * @return a new copy of this table with replaced table options */ CatalogTable copy(Map<String, String> options); }
Global ConfigOption to Enable or Disable
User can configure to disable this feature totally, default is enable false.
public class TableConfigOptions { @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption<Boolean> TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED = key("table.dynamic-table-options.enabled") .booleanType() .defaultValue(false) .withDescription("Enable or disable the OPTIONS hint used to specify table options" + "dynamically, if disabled, an exception would be thrown " + "if any OPTIONS hint is specified"); }
Proposed Change
We have implemented an extensible framework for hints in Apache Calcite, for details, please see doc: Calcite Hints Design and JIRA issue: CALCITE-482, the hints framework is used as the basic infrastructure for our dynamic table options.
Customize Hint Strategies
We need to customize our own HintStrategyTable, e.g. the hint items we supported and their HintStrategy s. The HintStrategyTable indicates which hints we support, it also defines the propagation strategy, hint option checker and the error handler.
- The propagation strategy tells planner how the hints items should be propagated within the relational expression tree
- The option checker is used to check if the hint options are valid, the check happens during sql-to-rel conversion(where the RelHint was instantiated)
- The error handler defines what we should do when we encounter any hint errors(i.e. the hint content is not as expected), to keep sync with most of the SQL vendors, the default behavior is to log warnings
The code below illustrates how we can construct a HintStrategyTable:
builder .hintStrategy("no_hash_join", HintPredicates.JOIN) .hintStrategy("time_zone", HintPredicates.SET_VAR) .hintStrategy("index", HintPredicates.TABLE_SCAN) .hintStrategy("OPTIONS", HintPredicates.TABLE_SCAN) .hintStrategy( "resource", HintPredicates.or( HintPredicates.PROJECT, HintPredicates.AGGREGATE, HintPredicates.CALC)) .hintStrategy("AGG_STRATEGY", HintStrategy.builder(HintPredicates.AGGREGATE) .optionChecker( (hint, errorHandler) -> errorHandler.check( hint.listOptions.size() == 1 && (hint.listOptions.get(0).equalsIgnoreCase("ONE_PHASE") || hint.listOptions.get(0).equalsIgnoreCase("TWO_PHASE")), "Hint {} only allows single option, " + "allowed options: [ONE_PHASE, TWO_PHASE]", hint.hintName)).build()) .hintStrategy("use_merge_join", HintStrategy.builder( HintPredicates.and(HintPredicates.JOIN, joinWithFixedTableName())) .excludedRules(EnumerableRules.ENUMERABLE_JOIN_RULE).build()) .build();
Error Handling
Different with most of the vendors, the default behavior is throwing exception if we encounter any hint Flink can not recognize or the hint options format is not correct.
From the discussion, throwing behavior makes it easier for systems that integrate Flink to consume these errors.
This behavior is pluggable through the HintStrategyTable but it's not planned to be configured by front users.
The Options Hint
Note: we only support one hint for this FLIP.
The OPTIONS hint is as a way to declare the dynamic table options, the syntax is
table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */ |
Each dynamic table options is parsed and then hand over to the RelOptTable during sql-to-rel-conversion:
Note: the OPTIONS hint may affect the semantics of the query(i.e. the result set may change when applying the hint).
Options Validation
- Before we actually create the TableSource/Sink, call the CatalogTabble#copy to get a table with merged options then use it to find the table factory, there is no constraints on which options can be overridden, any option can be in the OPTIONS hint
- The specified options would override the options defined in the CREATE TABLE DDL
- The real options validation actually happens in each connector table factory
Options Propagation
Let T be the table name that attaches hints:
- If T is a view, ignore the hints, we do not allow any query hints in the table hints context, which is ambiguous because the view is actually a query and itself can take a query hint;
- If T comes from a table source registration, we only attach the hints to the table scan node
- If T comes from a DDL, we merge the properties of DDL and the OPTIONS hint options(table hints options override the DDL ones), the table scan node also takes all the declared table hints
Compatibility, Deprecation, and Migration Plan
This is a new feature and compatible with the old version Flink. We may deprecate TableScanRule#INSTANCE and make some refactor to ToRelContext(because we use it to pass the table hins, see CALCITE-3769), but this is transparent to users.
Implementation Plan
For the first step, we only implement OPTIONS table hint:
- Make some refactoring to ToRelContext, remove all the TableScanRule usage from Flink
- Add Flink HintStrategyTable
- Modify the CatalogSourceTable and FlinkRelBuilder to support hints options merge
- Modify each connectors in order to merge the options from OPTIONS hint and the DDL definitions
Test Plan
The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source/sink implementations.
Rejected Alternatives
The Syntax
Option1: WITH Keyword Following A Table Reference
select ... from my_table WITH (k1=v1, k2=v2) |
The WITH keyword is not SQL compliant, it would break the main query part (DQL) and i think we should value the SQL standards much more than the DDL, choosing this way means Flink SQL would differ with main SQL vendors for the query syntax.
Option2: Use the SQL Comments Without Constraints
select ... from my_table /*+ OPTIONS(k1=v1, k2=v2) */ |
Some of the connector properties items (i.e. Kafka topic name, zookeeper endpoint) is relevant with the authority, we should be cautious with these items.
Option3: Create Table LIKE WITH
create table t2 LIKE t1 WITH (k1=v1, k2=v2) |
This syntax is proposed in FLIP-110, we rejected it because:
- The syntax is verbose, user have to define a new temporal table just to override a single table option
- Parameters are not part of the main query
- No SQL standard compliant
For details, please see https://docs.google.com/document/d/1LnYvdp8R2OLxn9pKntfNQOfj1yKmmSiOA2UKmMUVB0E/edit?usp=sharing.
The Property White/Black List
We have discussed to set up black(or white) list for each connector options, but with the discussion going on, we found that we're designing ourselves into ever more complicated corners, basically for these questions:
- Which connector options are allowed to dynamically set up ?
- How and when to merge the dynamic and static options ?
- We have to modify based on each connector's code base, which is too complex to connector developer
Finally we decide to default disable this feature (because there are some concerns that hint should not affect the query semantics) but allow to override any connector options.
Open Questions
Q1: Does Flink query hints support QB specification ?
The QB(e.g. query block) specification is useful when we want to specify the scope of the hint items precisely.
We are considering supporting that in the near future version, because based on current code base of Calcite sql parse and conversion, it is hard to abstract a component that is equivalent to Oracle style query block at least for now.
Reference
[1] Oracle Using Optimizer Hints
[3] Index hints, using its own extension to the SQL standard
[4] SQL-SERVER Hints (Transact-SQL) - Join
[5] SQL-SERVER Hints (Transact-SQL) - Query