Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


  1. Just delegate DDLs to super class and reuse FLIP-123 to process them.
  2. Process DDLs using the semantics we get from Hive.

For simplicityIn order to provide consistent user experience, we choose option #1#2 to handle DDLs.


Use HiveParserCalcitePlanner to generate the RelNode and create a PlannerQueryOperation with it.


Since we have multiple implementations of org.apache.flink.table.delegation.Parser now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce BlinkParserFactory ParserFactory to create Parser instances and use SPI to find the factory to use according to the current dialect.

BlinkParserFactory ParserFactory definition:

Code Block
public interface BlinkParserFactoryParserFactory extends ComponentFactory {

	Parser create(CatalogManager catalogManager, PlannerContext plannerContext);


Code Block
public class ParserImplFactoryDefaultParserFactory implements BlinkParserFactoryParserFactory {

	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new ParserImpl(
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)));

	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();

	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();

	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());


Code Block
public class HiveParserFactory implements BlinkParserFactoryParserFactory {

	public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) {
		SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter;
		return new HiveParser(
				() -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()),
				tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)),

	public Map<String, String> optionalContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();

	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();

	public List<String> supportedProperties() {
		return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key());

We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser instance, but has to create a new instance each time getParser is calledif dialect has changed. The updated getParser method:

Code Block
  override def getParser: Parser = {
    if (getTableConfig.getSqlDialect != currentDialect) {
      val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() ->
      parser = ComponentFactoryService.find(classOf[BlinkParserFactoryParserFactory], parserProps)
        .create(catalogManager, plannerContext)
      currentDialect = getTableConfig.getSqlDialect


This class encapsulates all the logic to generate the RelNode plan for a query. We follow Hive’s CalcitePlanner to do it but adapt to our own needs. For example, Hive has its own RelNode implementations like HiveProject and HiveJoin, which we’ll change to LogicalProject and LogicalJoin respectively. Moreover, Hive’s CalcitePlanner doesn’t support all Hive features, e.g. SORT BY or LATERAL VIEW. We can extend the logic in HiveParserCalcitePlanner and support them.


  1. HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
  2. The process to generate RelNode plan is tightly coupled with ASTNode and semantic analysis. While it’s theoretically possible to make HiveParserCalcitePlanner support different versions, that’ll make the logic much more complicated and error-prone.
  3. The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.

Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions.  For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.

Go Beyond Hive

To support HiveQL on non-Hive tablesIn order to support the use cases mentioned in the "Motivation" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:

  1. Extend Hive syntax, so that it supports identifiers like "catalog.db.table", and streaming features like Group Windows.
  2. Leverage Flink Catalog to retrieve metadata.
  3. Leverage Catalog to resolve tables or views.
  4. Leverage SqlOperatorTable to resolve functions.
  5. Extend Hive syntax to support streaming features such as Group Windows.

These efforts are within scope of this FLIP but don't need to be implemented in an MVP releaseUltimately we'll make this feature as a pure SQL dialect, which is orthogonal to the tables being queried.

New or Changed Public Interfaces
