Status

Current state["Accepted"]

Discussion thread:  https://lists.apache.org/thread.html/r94af5d3d97e76e7dd9df68cbffff0becc7ba74d15591a8fab84c72fa%40%3Cdev.flink.apache.org%3E

JIRA FLINK-17101 - Getting issue details... STATUS

Released: 1.11.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Now a Flink SQL front user can define tables within a catalog by specifying DDLs, for example, a Kafka topic can be defined as a table with statement like below:

create table kafka_table (
  id bigint,
  age int,
  name STRING
) WITH (
  'connector.type' = 'kafka',
  'update-mode' = 'append'
  'connector.topic' = 'topic.employees',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'csv',
  'format.ignore-parse-errors' = 'false'
)

Then we would have a table defined in the catalog, and we can reference it in our query. The table options were persisted to the Catalog and if we want to modify that, we should use another DDL like "ALTER TABLE ...". But there are some cases that user want to modify the table options dynamically just in the query:

  • My Job crashed, and i need to drop the history records to consume from the latest, so need to modify the "connector.startup-mode" to "latest-offset"
  • I want to ignore the parse errors, so need to set up the "format.ignore-parse-errors" to "true"

Another case is with a JDBC sink defined by DDL in the SQL-CLI:

create table jdbc_table (
  a int,
  b varchar
) with (
  'connector.type' = 'jdbc',
  'connector.write.flush.interval' = '5000'
)

In order to observe the data from database side, I may want my JDBC sink flush data more eagerly and set up the "connector.write.flush.interval" to "2000"

These parameters setting is very common and ad-hoc, setting them flexibly would promote the user experience with FLINK SQL especially for now we have so many different kind of connectors and so many supported table options.

Flink Dynamic Table Options Proposal

In order to pass around the table options dynamically and flexibly, we use the "table hints" syntax for these options: right after each referenced table in the query, we can specify an options block to indicate which options we want to override.

Note: we use the hints style syntax but different with most of the normal table hints, the dynamic options may affect the query result set, so strictly to say, it is not a "hint".

Hint Syntax

We choose the Oracle hints style syntax because its hints are embedded in comments, this does not break the sql compatibility and has very good readability. We avoid introducing too many non-standard SQL keywords.

To supply an Oracle style hints grammar for both query hints and table hints, we implement the hints grammar that can adapter most of the hint cases:

SQL Hints Syntax
query :
      select
      ...
      from
          table_name1 /*+ hint_content */
          join
          table_name2 /*+ hint_content */
      ...

hint_content :
      hint_item[, hint_item]*

hint_item :
  |   hint_name(k1=v1 [ , k2=v2 ]*)

k :
      string_literal

v :
      string_literal

hint_opt :
      simple_identifier
  |   numeric_literal
  |   string_literal


Example:

SELECT *
FROM
  EMP /*+ OPTIONS('k1'='v1', 'k2'='v2') */
  JOIN
  DEPT /*+ OPTIONS('a.b.c'='v3', 'd.e.f'='v4') */
ON
  EMP.deptno = DEPT.deptno

Public Interfaces

Regenerate New CatalogTable with Given Options

CatalogTable#copy(Map<String, String>) supports creating a new copy of original CatalogTable with given options, we can use this interface to override with dynamic table options.

public interface CatalogTable extends CatalogBaseTable {

    /**
    * Returns a copy of this {@code CatalogTable} with given table options {@code options}.
    *
    * @return a new copy of this table with replaced table options
    */
    CatalogTable copy(Map<String, String> options);
}

Global ConfigOption to Enable or Disable

User can configure to disable this feature totally, default is enable false.

public class TableConfigOptions {
	@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
	public static final ConfigOption<Boolean> TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED =
		key("table.dynamic-table-options.enabled")
			.booleanType()
			.defaultValue(false)
			.withDescription("Enable or disable the OPTIONS hint used to specify table options" +
				"dynamically, if disabled, an exception would be thrown " +
				"if any OPTIONS hint is specified");

}

Proposed Change

We have implemented an extensible framework for hints in Apache Calcite, for details, please see doc: Calcite Hints Design and JIRA issue: CALCITE-482, the hints framework is used as the basic infrastructure for our dynamic table options.

Customize Hint Strategies

We need to customize our own HintStrategyTable, e.g. the hint items we supported and their HintStrategy s. The HintStrategyTable indicates which hints we support, it also defines the propagation strategy, hint option checker and the error handler.

  • The propagation strategy tells planner how the hints items should be propagated within the relational expression tree
  • The option checker is used to check if the hint options are valid, the check happens during sql-to-rel conversion(where the RelHint was instantiated)
  • The error handler defines what we should do when we encounter any hint errors(i.e. the hint content is not as expected), to keep sync with most of the SQL vendors, the default behavior is to log warnings

The code below illustrates how we can construct a HintStrategyTable:

hint strategy example
      builder
        .hintStrategy("no_hash_join", HintPredicates.JOIN)
        .hintStrategy("time_zone", HintPredicates.SET_VAR)
        .hintStrategy("index", HintPredicates.TABLE_SCAN)
        .hintStrategy("OPTIONS", HintPredicates.TABLE_SCAN)
        .hintStrategy(
            "resource", HintPredicates.or(
            HintPredicates.PROJECT, HintPredicates.AGGREGATE, HintPredicates.CALC))
        .hintStrategy("AGG_STRATEGY",
            HintStrategy.builder(HintPredicates.AGGREGATE)
                .optionChecker(
                    (hint, errorHandler) -> errorHandler.check(
                    hint.listOptions.size() == 1
                        && (hint.listOptions.get(0).equalsIgnoreCase("ONE_PHASE")
                        || hint.listOptions.get(0).equalsIgnoreCase("TWO_PHASE")),
                    "Hint {} only allows single option, "
                        + "allowed options: [ONE_PHASE, TWO_PHASE]",
                    hint.hintName)).build())
        .hintStrategy("use_merge_join",
            HintStrategy.builder(
                HintPredicates.and(HintPredicates.JOIN, joinWithFixedTableName()))
                .excludedRules(EnumerableRules.ENUMERABLE_JOIN_RULE).build())
        .build();

Error Handling

Different with most of the vendors, the default behavior is throwing exception if we encounter any hint Flink can not recognize or the hint options format is not correct.

From the discussion, throwing behavior makes it easier for systems that integrate Flink to consume these errors.

This behavior is pluggable through the HintStrategyTable but it's not planned to be configured by front users.

The Options Hint

Note: we only support one hint for this FLIP.

The OPTIONS hint is as a way to declare the dynamic table options, the syntax is

table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */


Each dynamic table options is parsed and then hand over to the RelOptTable during sql-to-rel-conversion:

Note: the OPTIONS hint may affect the semantics of the query(i.e. the result set may change when applying the hint).

Options Validation

  • Before we actually create the TableSource/Sink, call the CatalogTabble#copy to get a table with merged options then use it to find the table factory, there is no constraints on which options can be overridden, any option can be in the OPTIONS hint
  • The specified options would override the options defined in the CREATE TABLE DDL
  • The real options validation actually happens in each connector table factory

Options Propagation

Let T be the table name that attaches hints:

  • If T is a view, ignore the hints, we do not allow any query hints in the table hints context, which is ambiguous because the view is actually a query and itself can take a query hint;
  • If T comes from a table source registration, we only attach the hints to the table scan node
  • If T comes from a DDL, we merge the properties of DDL and the OPTIONS hint options(table hints options override the DDL ones), the table scan node also takes all the declared table hints

Compatibility, Deprecation, and Migration Plan

This is a new feature and compatible with the old version Flink. We may deprecate TableScanRule#INSTANCE and make some refactor to ToRelContext(because we use it to pass the table hins, see CALCITE-3769), but this is transparent to users.

Implementation Plan

For the first step, we only implement OPTIONS table hint:

  • Make some refactoring to ToRelContext, remove all the TableScanRule usage from Flink
  • Add Flink HintStrategyTable
  • Modify the CatalogSourceTable and FlinkRelBuilder to support hints options merge
  • Modify each connectors in order to merge the options from OPTIONS hint and the DDL definitions

Test Plan

The implementation can be tested with unit tests for every new feature. And we can add integration tests for connectors to verify it can cooperate with existing source/sink implementations.

Rejected Alternatives

The Syntax

Option1: WITH Keyword Following A Table Reference

select ... from my_table WITH (k1=v1, k2=v2)

The WITH keyword is not SQL compliant, it would break the main query part (DQL) and i think we should value the SQL standards much more than the DDL, choosing this way means Flink SQL would differ with main SQL vendors for the query syntax.

Option2: Use the SQL Comments Without Constraints

select ... from my_table /*+ OPTIONS(k1=v1, k2=v2) */

Some of the connector properties items (i.e. Kafka topic name, zookeeper endpoint) is relevant with the authority, we should be cautious with these items.

Option3: Create Table LIKE WITH

create table t2 LIKE t1 WITH (k1=v1, k2=v2)

This syntax is proposed in FLIP-110, we rejected it because:

  • The syntax is verbose, user have to define a new temporal table just to override a single table option
  • Parameters are not part of the main query
  • No SQL standard compliant

For details, please see https://docs.google.com/document/d/1LnYvdp8R2OLxn9pKntfNQOfj1yKmmSiOA2UKmMUVB0E/edit?usp=sharing.

The Property White/Black List

We have discussed to set up black(or white) list for each connector options, but with the discussion going on, we found that we're designing ourselves into ever more complicated corners, basically for these questions:

  • Which connector options are allowed to dynamically set up ?
  • How and when to merge the dynamic and static options ?
  • We have to modify based on each connector's code base, which is too complex to connector developer

Finally we decide to default disable this feature (because there are some concerns that hint should not affect the query semantics) but allow to override any connector options.

Open Questions

Q1: Does Flink query hints support QB specification ?

The QB(e.g. query block) specification is useful when we want to specify the scope of the hint items precisely.

We are considering supporting that in the near future version, because based on current code base of Calcite sql parse and conversion, it is hard to abstract a component that is equivalent to Oracle style query block at least for now.

Reference

[1] Oracle Using Optimizer Hints

[2] MySQL Optimizer hints

[3] Index hints, using its own extension to the SQL standard

[4] SQL-SERVER Hints (Transact-SQL) - Join

[5] SQL-SERVER Hints (Transact-SQL) - Query

[6] SQL_SERVER Hints (Transact-SQL) - Table

[7] Design of Calcite SQL and Planner Hints

  • No labels