Page properties | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Status
Current state: under discussion
Discussion thread:
...
Motivation
FLIP-123 has implemented HiveQL-compatible DDLs so that users can process metadata in HiveQL. This FLIP aims to provide syntax compatibility for queries. Similar as FLIP-123, this FLIP will improve interoperability with Hive and reduce migration efforts. Besides, this FLIP also makes it possible to extend HiveQL to support streaming features. And therefore, users may not only write HiveQL for batch jobs, they may also run streaming jobs in a HiveQL-fashion, which provides better batch-streaming unified experience for a migrating userwith this FLIP, the following typical use cases can be supported:
- Users can migrate their batch Hive jobs to Flink, without needing to modify the SQL scripts.
- Users can write HiveQL to integrate streaming features with Hive tables, e.g. streaming data from Kafka to Hive.
- Users can write HiveQL to process non-Hive tables, either in batch or in streaming jobs.
For migrating users, we believe it's desirable for them to be able to continue write Hive syntax. It not only makes the migration easier, but also helps them leverage Flink for new scenarios more quickly, and thus provides unified batch-streaming experience.
Proposed Changes
The Idea
...
- Just delegate DDLs to super class and reuse FLIP-123 to process them.
- Process DDLs using the semantics we get from Hive.
For simplicityIn order to provide consistent user experience, we choose option #1#2 to handle DDLs.
DQL
Use HiveParserCalcitePlanner
to generate the RelNode
and create a PlannerQueryOperation
with it.
...
Since we have multiple implementations of org.apache.flink.table.delegation.Parser
now, we need to make it pluggable so that the planner knows which one to use. We’ll introduce BlinkParserFactory
ParserFactory
to create Parser
instances and use SPI to find the factory to use according to the current dialect.
BlinkParserFactory
ParserFactory
definition:
Code Block | ||
---|---|---|
| ||
public interface BlinkParserFactoryParserFactory extends ComponentFactory { Parser create(CatalogManager catalogManager, PlannerContext plannerContext); } |
...
Code Block | ||
---|---|---|
| ||
public class ParserImplFactoryDefaultParserFactory implements BlinkParserFactoryParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter; return new ParserImpl( catalogManager, () -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema))); } @Override public Map<String, String> optionalContext() { DescriptorProperties properties = new DescriptorProperties(); return properties.asMap(); } @Override public Map<String, String> requiredContext() { DescriptorProperties properties = new DescriptorProperties(); properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.DEFAULT.name().toLowerCase()); return properties.asMap(); } @Override public List<String> supportedProperties() { return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); } } |
...
Code Block | ||
---|---|---|
| ||
public class HiveParserFactory implements BlinkParserFactoryParserFactory { @Override public Parser create(CatalogManager catalogManager, PlannerContext plannerContext) { SqlExprToRexConverterFactory sqlExprToRexConverterFactory = plannerContext::createSqlExprToRexConverter; return new HiveParser( catalogManager, () -> plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase()), plannerContext::createCalciteParser, tableSchema -> sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory().buildRelNodeRowType(tableSchema)), plannerContext); } @Override public Map<String, String> optionalContext() { DescriptorProperties properties = new DescriptorProperties(); return properties.asMap(); } @Override public Map<String, String> requiredContext() { DescriptorProperties properties = new DescriptorProperties(); properties.putString(TableConfigOptions.TABLE_SQL_DIALECT.key(), SqlDialect.HIVE.name().toLowerCase()); return properties.asMap(); } @Override public List<String> supportedProperties() { return Collections.singletonList(TableConfigOptions.TABLE_SQL_DIALECT.key()); } } |
We need to support switching dialect on a per-statement basis. Therefore blink planner cannot hold a constant Parser
instance, but has to create a new instance each time getParser
is calledif dialect has changed. The updated getParser
method:
Code Block | ||
---|---|---|
| ||
override def getParser: Parser = { if (getTableConfig.getSqlDialect != currentDialect) { val parserProps = Map(TableConfigOptions.TABLE_SQL_DIALECT.key() -> getTableConfig.getSqlDialect.name().toLowerCase) parser = ComponentFactoryService.find(classOf[BlinkParserFactoryParserFactory], parserProps) .create(catalogManager, plannerContext) currentDialect = getTableConfig.getSqlDialect } parser } |
HiveParserCalcitePlanner
...
- HiveQL syntax is in general backward compatible. So we can use a newer version to support older versions.
- The process to generate
RelNode
plan is tightly coupled withASTNode
and semantic analysis. While it’s theoretically possible to makeHiveParserCalcitePlanner
support different versions, that’ll make the logic much more complicated and error-prone. - The copied code gives us more flexibility to support new features in the future. For example, we can adapt the code to support writing HiveQL for generic tables, or support querying tables across multiple catalogs.
Since most Hive users are still using Hive 2.x or 1.x, we'll copy Hive code from 2.x, which would reduce the required efforts to cover these versions. For new features in Hive 3.x, e.g. table constraints, we'll extend the copied code to support them.
Go Beyond Hive
In order to support the use cases mentioned in the "Motivation
" section, we should ultimately make this feature as a pure SQL dialect and decoupled from Hive tables or functions. To achieve that, we need to:
- Extend Hive syntax, so that it supports identifiers like "catalog.db.table".
- Leverage
Catalog
to resolve tables or views. - Leverage
SqlOperatorTable
to resolve functions. - Extend Hive syntax to support streaming features such as Group Windows.
These efforts are within scope of this FLIP but don't need to be implemented in an MVP release.
New or Changed Public Interfaces
...
The following limitations apply when using this feature.
...
.
HiveModule
should be used in order to use Hive built-in functions.- Some features are not supported due to underlying functionalities are missing. For example, Hive’s
UNION
type is not supported.
Appendix
...