Discussion thread
Vote thread
JIRA

FLINK-17198 - Getting issue details... STATUS

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Hive connector has reached “production-ready” in Flink-1.10.0, and more users are trying to use Flink to access their Hive warehouses. While we have covered most commonly-used features in Hive, there’re still some pain points in the migration, some of which include:

  • Lack of DDL support
  • Syntax difference between FlinkSQL and HiveQL

In addition, although we provide a Hive dialect, this dialect actually only serves as a switch to enable creating partitioned tables. Using Hive dialect doesn’t mean users can write in Hive syntax, which can be counterintuitive.

Therefore, we propose to implement DDL and DML for Hive connector, in a HiveQL-compatible way.  Users will have the compatibility when they choose to use Hive dialect. DQL is out of the scope of this FLIP and left for the future. With compatible DDL and DML, we believe users can at least migrate some of their scripts without needing to change them.

Proposed Changes

Introduce a New Parser

Since FlinkSQL has already defined some DDLs and DMLs, we won’t change the current parser to support HiveQL otherwise our grammar will become quite complicated. Instead, we’ll generate a separate parser for Hive dialect, and define DDL and DML in this parser following HiveQL syntax. This parser can be generated in the flink-sql-parser module alongside the current FlinkSqlParserImpl. If for some reason that doesn’t work, we can also create a new Maven module for it.

DDL Support

Once we have the parser in place, we can implement Hive syntax in the grammar template file. To accommodate Hive-specific features in the DDL, we need to extend the current DDL SqlNode. For example, Hive allows specifying location when creating a database. We’ll extend SqlCreateDatabase to handle the location. Before calling Catalog APIs, all the extra information will be encoded in the properties of the Catalog object, i.e. CatalogDatabaseImpl in this example. HiveCatalog will need to know these properties and create Hive objects accordingly. Following is the example code extending SqlCreateDatabase.

public class SqlCreateHiveDatabase extends SqlCreateDatabase {

   public static final String DATABASE_LOCATION_URI = "database.location_uri";

   public SqlCreateHiveDatabase(SqlParserPos pos, SqlIdentifier databaseName, SqlNodeList propertyList,
         SqlCharStringLiteral comment, SqlCharStringLiteral location, boolean ifNotExists) {
      super(pos, databaseName, propertyList, comment, ifNotExists);
      if (location != null) {
         propertyList.add(new SqlTableOption(
               SqlLiteral.createCharString(DATABASE_LOCATION_URI, location.getParserPosition()),
               location,
               location.getParserPosition()));
      }
   }
}

In HiveCatalog::createDatabase, HiveCatalog will extract the location string from database properties and use it to create the Hive database object.

DML Support

Since we don’t support Hive’s ACID tables, we’ll just focus on INSERT statements here. The syntax difference regarding INSERT statement is that Hive requires users to specify all partition column names, even if a partition column is not assigned a static value, while in FlinkSQL such partition columns can be omitted. We’ll change to use Hive’s syntax in the new parser.

Hive also supports multi-insert which has the following syntax:


FROM from_statement

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1

[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]

[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;


We won’t support multi-insert in this FLIP since that’ll need support from the planner, e.g. being able to handle multiple CatalogSinkModifyOperation at a time.

Enable the Hive Parser

We need to allow users to specify the dialect in the sql-client yaml file. And we'll implement a SqlParserImplFactory which can create the parser according to the dialect/conformance in use. Suppose the new parser for Hive is named FlinkHiveSqlParserImpl. The SqlParserImplFactory implementation, assuming it's named FlinkSqlParserImplFactory, will be something like this:

public class FlinkSqlParserImplFactory implements SqlParserImplFactory {

	private final SqlConformance conformance;

	public FlinkSqlParserImplFactory(SqlConformance conformance) {
		this.conformance = conformance;
	}

	@Override
	public SqlAbstractParserImpl getParser(Reader stream) {
		if (conformance == FlinkSqlConformance.HIVE) {
			return FlinkHiveSqlParserImpl.FACTORY.getParser(stream);
		} else {
			return FlinkSqlParserImpl.FACTORY.getParser(stream);
		}
	}
}


In PlannerContext::getSqlParserConfig, we’ll use FlinkSqlParserImplFactory to create the config:

	private SqlParser.Config getSqlParserConfig() {
		return JavaScalaConversionUtil.<SqlParser.Config>toJava(getCalciteConfig(tableConfig).getSqlParserConfig()).orElseGet(
				// we use Java lex because back ticks are easier than double quotes in programming
				// and cases are preserved
				() -> {
					SqlConformance conformance = getSqlConformance();
					return SqlParser
							.configBuilder()
							.setParserFactory(new FlinkSqlParserImplFactory(conformance))
							.setConformance(conformance)
							.setLex(Lex.JAVA)
							.setIdentifierMaxLength(256)
							.build();
				}
		);
	}


New or Changed Public Interfaces

No public interface has to be added or changed.

Migration Plan and Compatibility

N/A

Rejected Alternatives

Copy Hive’s own grammar and parser/analyzer to handle DDL and DML. While this approach brings good compatibility, it probably requires more intrusive changes and is more difficult to support all Hive versions.

Limited Scope

Features for which the underlying functionalities are not ready are out of the scope of this FLIP, e.g. Hive’s CREATE TABLE AS and CONCATENATE won’t be supported.

Since Hive connector only works with blink planner, we’ll only make sure this feature works with blink planner. And it may or may not work with the old planner.

Calcite and HiveQL have different reserved keywords, e.g. DEFAULT is a reserved keyword in Calcite and a non-reserved keyword in HiveQL. Since Calcite currently doesn't allow us to change reserved keywords, users have to backtick-quote them when using in FlinkSQL.

The following table summarizes the DDLs that will be supported in this FLIP. Unsupported features are also listed so that we can track them and decide whether/how to support them in the future.

DatabaseSupportedCommentNot SupportedComment
CREATE
SHOW DATABASES LIKEShow databases filtering by a regular expression. Missing Catalog API.
DROP


ALTER


USE


SHOW


DESCRIBEWe don't have a TableEnvironment API for this. Perhaps it's easier to implement when FLIP-84 is in place.

TableCREATESupport specifying EXTERNAL, PARTITIONED BY, ROW FORMAT, STORED AS, LOCATION and table properties. Data types will also be in HiveQL syntax, e.g. STRUCTBucketed tables
DROP
CREATE LIKEWait for FLIP-110
ALTERInclude rename, update table properties, update SerDe properties, update fileformat and update location.CREATE ASMissing underlying functionalities, e.g. create the table when the job succeeds.
SHOW
Temporary tablesMissing underlying functionalities, e.g. removing the files of the temporary table when session ends.
DESCRIBE
SKEWED BY [STORED AS DIRECTORIES]Currently we don't use the skew info of a Hive table.


STORED BYWe don't support Hive table with a storage handler yet.


UNION type


TRANSACTIONAL tables


DROP PURGEData will be deleted w/o going to trash. Applies to either a table or partitions. Missing Catalog API.


TRUNCATERemove all rows from a table or partitions. Missing Catalog APIs.


TOUCH, PROTECTION, COMPACT, CONCATENATE, UPDATE COLUMNSApplies to either a table or partitions. Too Hive-specific or missing underlying functionalities.


SHOW TABLES 'regex'Show tables filtering by a regular expression. Missing Catalog API.


FOREIGN KEY, UNIQUE, DEFAULT, CHECKThese constraints are currently not used by the Hive connector.
PartitionALTERInclude add, drop, update fileformat and update location.Exchange, Discover, Retention, Recover, (Un)ArchiveToo Hive-specific or missing underlying functionalities.
SHOWSupport specifying partial specRENAMEUpdate a partition's spec. Missing Catalog API.
DESCRIBEWe don't have a TableEnvironment API for this. Perhaps it's easier to implement when FLIP-84 is in place.ALTER with partial specAlter multiple partitions matching a partial spec. Missing Catalog API.
ColumnALTERChange name, type, position, comment for a single column. Add new columns. Replace all columns.

FunctionCREATE
CREATE FUNCTION USING FILE|JAR…To support this, we need to be able to dynamically add resources to a session.
DROP
RELOADHive-specific
SHOW
SHOW FUNCTIONS LIKEShow functions filtering by a regular expression. Missing Catalog API.
ViewCREATEWait for FLIP-71SHOW VIEWS LIKEShow views filtering by a regular expression. Missing Catalog API.
DROPWait for FLIP-71

ALTERWait for FLIP-71

SHOWWait for FLIP-71

DESCRIBEWait for FLIP-71

The following table summarizes the DMLs that will be supported in this FLIP. Unsupported features are also listed so that we can track them and decide whether/how to support them in the future.


SupportedCommentUnsupportedComment
DMLsINSERT INTO/OVERWRITE PARTITIONSupport specifying dynamic partition columns in the specificationMulti-insert