Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, with FLIP-123, FLIP-152, we have supported Hive dialect. But it's much dependent on Flink planner. Also the interfaces involved is more like to be internal, which is not convenient and available to implement other dialects.
So, this FLIP is to Introduce pluggable dialect with some public interfaces so that make it convenient to support other dialects. Also, at the same time, it's intend to solve the legacy problem brought by supporting Hive dialect that the Hive connector is coupled to Flink planner, which brings much complexity and maintenance burden.
Public Interfaces
We propose the following interfaces:
- Parser
- ParserFactory
- CatalogRegistry
- OperationTreeBuilder
Parser
/** Provides methods for parsing SQL objects from a SQL string. */ @PublicEvolving public interface Parser { // the interface has existed in current codebase, here we just expose it. }
ParserFactory
/** * Factory that creates {@link Parser}. * * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link * TableConfigOptions#TABLE_SQL_DIALECT}. */ @PublicEvolving public interface ParserFactory extends Factory { /** Creates a new parser. */ Parser create(Context context); /** Context provided when a parser is created. */ @PublicEvolving interface Context { CatalogRegistry getCatalogRegistry(); // interfaces provided dealing with get catalog, qulify identifier, etc. OperationTreeBuilder getOperationTreeBuilder(); // interfaces provided to build Operation. } }
CatalogRegistry
/** * A catalog registry for dealing with catalogs */ @PublicEvolving public interface CatalogRegistry { String getCurrentDatabase(); String getCurrentCatalog(); ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier); Optional<Catalog> getCatalog(String catalogName); Optional<ResolvedCatalogBaseTable<?>> getResolvedCatalogBaseTable( ObjectIdentifier objectIdentifier); boolean isTemporaryTable(ObjectIdentifier objectIdentifier); Optional<CatalogPartition> getPartition( ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec); }
OperationTreeBuilder
/** * A builder for building {@link org.apache.flink.table.operations.Operation} */ @PublicEvolving public interface OperationTreeBuilder { public QueryOperation project(List<Expression> projectList, QueryOperation child); public QueryOperation project( List<Expression> projectList, QueryOperation child, boolean explicitAlias); public QueryOperation project( List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows) // omit other public interaces has implemented in OperationTreeBuilder }
Implement detail
These public interfaces refered above will be added to module flink-table-api-java. And then when we want to support other dialects, we need to include this module and implement DalectFactory to create a Parser and ExtendedOperationExecutor(optional) for the specific dialect.
Example for supporting other dialect
In here, I would like to give an example for support MySQL dialect.
MySQLParserFactory
public MySQLDialectFactory extends DialectFactory { @Override public String factoryIdentifier() { return "mysql"; } Parser create(Context context) { return new MySqlParser(context); } }
MySQLParser
public class MySQLParser implements Parser { private OperationTreeBuilder operationTreeBuilder; public MySQLParser(Context context) { this.operationTreeBuilder = context.getOperationTreeBuilder; } @Override List<Operation> parse(String statement) { // parse it to AST(Abstract Semantic Tree) MySQLAST mysqlAST = parseStament(statement); // convert the AST to Flink OperationTree List<Operation> operations = convertToOperation(mysqlAST); } private List<Operation> convertToOperation(MySQLAST mysqlAST) { // may look like Operation operation = operationTreeBuilder.project(Arrays.asList(Expressions.$("f0"), new SourceQueryOperation(xx)); return Collections.singletonList(operation) } @Override UnresolvedIdentifier parseIdentifier(String identifier) { // may need to identifier `db.t` to array of [db, t] string[] names = parseMySQLIdentifier(identifier); return UnresolvedIdentifier.of(names); } @Override ResolvedExpression parseSqlExpression( String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) { // parse sqlExpression to ResolvedExpression } @Override String[] getCompletionHints(String statement, int position) { // just for example, return empty string array directly return new String[0]; } }
Then, specific the class path of MySQLDialectFactory in the reosurce file org.apache.flink.table.factories.Factory to make it can be discovered by Java SPI mechanism.
After that, you can switch to MySQL dialect by setting table.sql-dialect whiling excuting the sql.
Implementation for migrating Hive dialect
To support for Hive dialect, we should also follow this style: parsing to AST and convert it to Flink's operation tree via OperationTreeBuilder. But we has implemeted Hive dialect, and the current implementation is convert the sql to Calcite’s RelNode which is consistent to Hive’s implementation when using CBO in Hive. It'll take much efforts for we need to rewrite the codebase about Hive dialect totally for it's totally different from the current implementation. It's hard to migrate to Flink's operation tree at one shot.
So the tempory way is to introduce a slim module called flink-table-calcite-bridge. This module contains the Calcite dependencies for writing planner plugins (e.g. SQL dialects) that interact with Calcite APIs. More exactly, currently, it is intend to provide the ability to create RelNode, which involves accessing the RelOptCluster, RelBuilder, etc, provided by PlannerContext.
But it's internal and designed for only Hive connector. At the end, the Hive dialect should be migrate to Flink's operation tree and the module can be dropped. To do this, we propose to introduce the following interal interfaces/class in the new moudle flink-table-calcite-bridge:
CalciteContext
// Context for creating RelNode @Internal public interface CalciteContext extends ParserFactory.Context { CalciteCatalogReader createCatalogReader( boolean lenientCaseSensitivity, String currentCatalog, String currentDatabase); RelOptCluster getCluster(); FrameworkConfig createFrameworkConfig(); RelDataTypeFactory getTypeFactory(); RelBuilder createRelBuilder(String currentCatalog, String currentDatabase); }
The interfaces have been implemented in PlannerContext, but we need to expose them to enable Hive connector to use.
CalciteQueryOperation
/** Wrapper for Calcite RelNode tree. */ @Internal public class CalciteQueryOperation implements QueryOperation { private final RelNode calciteTree; private final ResolvedSchema resolvedSchema; public CalciteQueryOperation(RelNode calciteTree, ResolvedSchema resolvedSchema) { this.calciteTree = calciteTree; this.resolvedSchema = resolvedSchema; } }
Proposed Changes
- Introduce some public interfaces for supporting other dialects in flink-table-api-jave module
- Introduce a slim module called flink-table-calcite-bridge providing calcite dependency as a part of migrating Hive dialect. It's just a tempory way to migrate Hive dialect, the specific dependency or logic should be dropped at the end.
Compatibility, Deprecation, and Migration Plan
For Hive dialect, first replace the dependency on flink-table-planner with flink-table-calcite-bridge, and then migrate it to Flink's OperationTree along with dropping flink-table-calcite-bridge.
Test Plan
It's just a refactor work, which can be tested by existing tests.