Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Hive connector has reached “production-ready” in Flink-1.10.0, and more users are trying to use Flink to access their Hive warehouses. While we have covered most commonly-used features in Hive, there’re still some pain points in the migration, some of which include:
- Lack of DDL support
- Syntax difference between FlinkSQL and HiveQL
In addition, although we provide a Hive dialect, this dialect actually only serves as a switch to enable creating partitioned tables. Using Hive dialect doesn’t mean users can write in Hive syntax, which can be counterintuitive.
Therefore, we propose to implement DDL and DML for Hive connector, in a HiveQL-compatible way. Users will have the compatibility when they choose to use Hive dialect. DQL is out of the scope of this FLIP and left for the future. With compatible DDL and DML, we believe users can at least migrate some of their scripts without needing to change them.
Proposed Changes
Introduce a New Parser
Since FlinkSQL has already defined some DDLs and DMLs, we won’t change the current parser to support HiveQL otherwise our grammar will become quite complicated. Instead, we’ll generate a separate parser for Hive dialect, and define DDL and DML in this parser following HiveQL syntax. This parser can be generated in the flink-sql-parser
module alongside the current FlinkSqlParserImpl
. If for some reason that doesn’t work, we can also create a new Maven module for it.
DDL Support
Once we have the parser in place, we can implement Hive syntax in the grammar template file. To accommodate Hive-specific features in the DDL, we need to extend the current DDL SqlNode. For example, Hive allows specifying location when creating a database. We’ll extend SqlCreateDatabase
to handle the location. Before calling Catalog APIs, all the extra information will be encoded in the properties of the Catalog object, i.e. CatalogDatabaseImpl
in this example. HiveCatalog
will need to know these properties and create Hive objects accordingly. Following is the example code extending SqlCreateDatabase
.
public class SqlCreateHiveDatabase extends SqlCreateDatabase { public static final String DATABASE_LOCATION_URI = "database.location_uri"; public SqlCreateHiveDatabase(SqlParserPos pos, SqlIdentifier databaseName, SqlNodeList propertyList, SqlCharStringLiteral comment, SqlCharStringLiteral location, boolean ifNotExists) { super(pos, databaseName, propertyList, comment, ifNotExists); if (location != null) { propertyList.add(new SqlTableOption( SqlLiteral.createCharString(DATABASE_LOCATION_URI, location.getParserPosition()), location, location.getParserPosition())); } } }
In HiveCatalog::createDatabase
, HiveCatalog
will extract the location string from database properties and use it to create the Hive database object.
DML Support
Since we don’t support Hive’s ACID tables, we’ll just focus on INSERT statements here. The syntax difference regarding INSERT statement is that Hive requires users to specify all partition column names, even if a partition column is not assigned a static value, while in FlinkSQL such partition columns can be omitted. We’ll change to use Hive’s syntax in the new parser.
Hive also supports multi-insert which has the following syntax:
FROM from_statement INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;
We won’t support multi-insert in this FLIP since that’ll need support from the planner, e.g. being able to handle multiple CatalogSinkModifyOperation
at a time.
Enable the Hive Parser
We need to allow users to specify the dialect in the sql-client yaml file. And we'll implement a SqlParserImplFactory
which can create the parser according to the dialect/conformance in use. Suppose the new parser for Hive is named FlinkHiveSqlParserImpl
. The SqlParserImplFactory
implementation, assuming it's named FlinkSqlParserImplFactory
, will be something like this:
public class FlinkSqlParserImplFactory implements SqlParserImplFactory { private final SqlConformance conformance; public FlinkSqlParserImplFactory(SqlConformance conformance) { this.conformance = conformance; } @Override public SqlAbstractParserImpl getParser(Reader stream) { if (conformance == FlinkSqlConformance.HIVE) { return FlinkHiveSqlParserImpl.FACTORY.getParser(stream); } else { return FlinkSqlParserImpl.FACTORY.getParser(stream); } } }
In PlannerContext::getSqlParserConfig
, we’ll use FlinkSqlParserImplFactory
to create the config:
private SqlParser.Config getSqlParserConfig() { return JavaScalaConversionUtil.<SqlParser.Config>toJava(getCalciteConfig(tableConfig).getSqlParserConfig()).orElseGet( // we use Java lex because back ticks are easier than double quotes in programming // and cases are preserved () -> { SqlConformance conformance = getSqlConformance(); return SqlParser .configBuilder() .setParserFactory(new FlinkSqlParserImplFactory(conformance)) .setConformance(conformance) .setLex(Lex.JAVA) .setIdentifierMaxLength(256) .build(); } ); }
New or Changed Public Interfaces
No public interface has to be added or changed.
Migration Plan and Compatibility
N/A
Rejected Alternatives
Copy Hive’s own grammar and parser/analyzer to handle DDL and DML. While this approach brings good compatibility, it probably requires more intrusive changes and is more difficult to support all Hive versions.
Limited Scope
Features for which the underlying functionalities are not ready are out of the scope of this FLIP, e.g. Hive’s CREATE TABLE AS and CONCATENATE won’t be supported.
Since Hive connector only works with blink planner, we’ll only make sure this feature works with blink planner. And it may or may not work with the old planner.
Calcite and HiveQL have different reserved keywords, e.g. DEFAULT
is a reserved keyword in Calcite and a non-reserved keyword in HiveQL. Since Calcite currently doesn't allow us to change reserved keywords, users have to backtick-quote them when using in FlinkSQL.
The following table summarizes the DDLs that will be supported in this FLIP. Unsupported features are also listed so that we can track them and decide whether/how to support them in the future.
Database | Supported | Comment | Not Supported | Comment |
CREATE | SHOW DATABASES LIKE | Show databases filtering by a regular expression. Missing Catalog API. | ||
DROP | ||||
ALTER | ||||
USE | ||||
SHOW | ||||
DESCRIBE | We don't have a TableEnvironment API for this. Perhaps it's easier to implement when FLIP-84 is in place. | |||
Table | CREATE | Support specifying EXTERNAL, PARTITIONED BY, ROW FORMAT, STORED AS, LOCATION and table properties. Data types will also be in HiveQL syntax, e.g. STRUCT | Bucketed tables | |
DROP | CREATE LIKE | Wait for FLIP-110 | ||
ALTER | Include rename, update table properties, update SerDe properties, update fileformat and update location. | CREATE AS | Missing underlying functionalities, e.g. create the table when the job succeeds. | |
SHOW | Temporary tables | Missing underlying functionalities, e.g. removing the files of the temporary table when session ends. | ||
DESCRIBE | SKEWED BY [STORED AS DIRECTORIES] | Currently we don't use the skew info of a Hive table. | ||
STORED BY | We don't support Hive table with a storage handler yet. | |||
UNION type | ||||
TRANSACTIONAL tables | ||||
DROP PURGE | Data will be deleted w/o going to trash. Applies to either a table or partitions. Missing Catalog API. | |||
TRUNCATE | Remove all rows from a table or partitions. Missing Catalog APIs. | |||
TOUCH, PROTECTION, COMPACT, CONCATENATE, UPDATE COLUMNS | Applies to either a table or partitions. Too Hive-specific or missing underlying functionalities. | |||
SHOW TABLES 'regex' | Show tables filtering by a regular expression. Missing Catalog API. | |||
FOREIGN KEY, UNIQUE, DEFAULT, CHECK | These constraints are currently not used by the Hive connector. | |||
Partition | ALTER | Include add, drop, update fileformat and update location. | Exchange, Discover, Retention, Recover, (Un)Archive | Too Hive-specific or missing underlying functionalities. |
SHOW | Support specifying partial spec | RENAME | Update a partition's spec. Missing Catalog API. | |
DESCRIBE | We don't have a TableEnvironment API for this. Perhaps it's easier to implement when FLIP-84 is in place. | ALTER with partial spec | Alter multiple partitions matching a partial spec. Missing Catalog API. | |
Column | ALTER | Change name, type, position, comment for a single column. Add new columns. Replace all columns. | ||
Function | CREATE | CREATE FUNCTION USING FILE|JAR… | To support this, we need to be able to dynamically add resources to a session. | |
DROP | RELOAD | Hive-specific | ||
SHOW | SHOW FUNCTIONS LIKE | Show functions filtering by a regular expression. Missing Catalog API. | ||
View | CREATE | Wait for FLIP-71 | SHOW VIEWS LIKE | Show views filtering by a regular expression. Missing Catalog API. |
DROP | Wait for FLIP-71 | |||
ALTER | Wait for FLIP-71 | |||
SHOW | Wait for FLIP-71 | |||
DESCRIBE | Wait for FLIP-71 |
The following table summarizes the DMLs that will be supported in this FLIP. Unsupported features are also listed so that we can track them and decide whether/how to support them in the future.
Supported | Comment | Unsupported | Comment | |
DMLs | INSERT INTO/OVERWRITE PARTITION | Support specifying dynamic partition columns in the specification | Multi-insert |