FLIP-123 has implemented HiveQL-compatible DDLs so that users can process metadata in HiveQL. This FLIP aims to provide syntax compatibility for queries. Similar as FLIP-123, this FLIP will improve interoperability with Hive and reduce migration efforts. Besides, this FLIP also makes it possible to extend HiveQL to support streaming features. And with this FLIP, the following typical use cases can be supported:
- Users can migrate their batch Hive jobs to Flink, without needing to modify the SQL scripts.
- Users can write HiveQL to integrate streaming features with Hive tables, e.g. streaming data from Kafka to Hive.
- Users can write HiveQL to process non-Hive tables, either in batch or in streaming jobs.
For migrating users, we believe it's desirable for them to be able to continue write Hive syntax. It not only makes the migration easier, but also helps them leverage Flink for new scenarios more quickly, and thus provides unified batch-streaming experience.
One thing that Flink and Hive have in common is that they both generate a Calcite plan (in form of a
RelNode) during SQL query processing. While Flink uses this
RelNode as its logical plan, Hive only uses it for optimization and later converts it to its own form of logical plan. The inputs to generate the plan are also different. For Flink, the input is a
SqlNode parsed by Calcite, while for Hive, the input is an
ASTNode parsed by Antlr.
The overall idea to support Hive query syntax, is to reuse Hive’s Antlr parser to generate the
ASTNode, and then adapt Hive’s RelNode-generation process to generate a
RelNode that can be used in Flink. Once we have the RelNode, we can simply follow the current processing path in Flink to optimize and finally execute the plan.
In order to process a SQL statement differently, we introduce
HiveParser as a sub-class of
ParserImpl and overrides the
parse method. This is the entry point of this feature.
We will leverage Hive’s Antlr parser to generate the
ASTNode, and Hive’s semantic analyzer to collect semantic info of the SQL statement. Based on these information, we can tell which kind the statement is and process it accordingly.
We have two options to process DDLs.
- Just delegate DDLs to super class and reuse FLIP-123 to process them.
- Process DDLs using the semantics we get from Hive.
In order to provide consistent user experience, we choose option #2 to handle DDLs.
HiveParserCalcitePlanner to generate the
RelNode and create a
PlannerQueryOperation with it.
Since we don’t support Hive ACID tables, we’ll only consider INSERT here.
HiveParserCalcitePlanner to generate the
RelNode for the query and create a
CatalogSinkModifyOperation. To do that,
HiveParser needs to:
- Figure out static partition specs for partitioned table, and add static partition values to Project in the
- Figure out dest schema, if any, and adjust the Project fields in the
- Figure out whether to overwrite existing data
Hive Dialect & Pluggable Parser
FLIP-123 has made SQL dialect as the switch to turn on/off Hive compatibility. In this FLIP, we’ll continue to use it for queries.
Since we have multiple implementations of
org.apache.flink.table.delegation.Parser now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce
ParserFactory to create
Parser instances and use SPI to find the factory to use according to the current dialect.
The factory to create
The factory to create
We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant
Parser instance, but has to create a new instance if dialect has changed. The updated
This class encapsulates all the logic to generate the
RelNode plan for a query. We follow Hive’s
CalcitePlanner to do it but adapt to our own needs. For example, Hive has its own
RelNode implementations like
HiveJoin, which we’ll change to
LogicalJoin respectively. Moreover, Hive’s
CalcitePlanner doesn’t support all Hive features, e.g. SORT BY or LATERAL VIEW. We can extend the logic in
HiveParserCalcitePlanner and support them.
Support Multiple Hive Versions
In order to support multiple Hive versions, we keep our own copy of Hive code to generate the
ASTNode and do semantic analysis. This means that a SQL statement is processed with the same code no matter which Hive version is in use. The rationale behind this decision is:
- HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
- The process to generate
RelNodeplan is tightly coupled with
ASTNodeand semantic analysis. While it’s theoretically possible to make
HiveParserCalcitePlannersupport different versions, that’ll make the logic much more complicated and error-prone.
- The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.
Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions. For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.
Go Beyond Hive
In order to support the use cases mentioned in the "
Motivation" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:
- Extend Hive syntax, so that it supports identifiers like "catalog.db.table".
Catalogto resolve tables or views.
SqlOperatorTableto resolve functions.
- Extend Hive syntax to support streaming features such as Group Windows.
These efforts are within scope of this FLIP but don't need to be implemented in an MVP release.
New or Changed Public Interfaces
No public interface has to be added or changed.
Compatibility, Deprecation, and Migration Plan
The best way to verify Hive query compatibility is to run Hive’s QFile test with Flink. But in order to limit our code base size and test time, this should not be included in this FLIP, at least not as a regular UT/IT case.
Let Hive generate the
RelNode and then convert it to meet Flink’s requirements. This will inevitably lead to jar conflicts because Hive depends on a quite old Calcite version while Flink basically upgrades Calcite dependency in each major release. In addition, it’s more difficult to support new features.
The following limitations apply when using this feature.
HiveModuleshould be used in order to use Hive built-in functions.
- Some features are not supported due to underlying functionalities are missing. For example, Hive’s
UNIONtype is not supported.
The following diagram shows the process how
HiveParser parses a SQL statement.