SQL is a popular language for big data development. Building SQL extensions for Hudi will greatly reduce  the cost of use.This paper will discuss the sql extension on hudi for spark engine.

Extended SQL Syntax

Here is the sql syntax we need to extend for hudi.


As hudi has primary keys, we add the primary key definition in the create table statement which does not support int current spark sql.


CREATE TABLE [ IF NOT EXISTS]  [database_name.] table_name
[ ( columnTypeList
    [, PRIMAY KEY(column_list)]
[ COMMENT table_comment ]
USING hudi
[ LOCATION location_path ]
) ]
[ AS query_statement ]


create table h0 (
  id bigint,
  name string,
  price double,
  primary key(id)
) using hudi

create table h1 using hudi as select *  from src



Merge the data from source table to the target.


MERGE INTO [database_name.] table_name [AS target_alias]
USING (query_statement | table_name) [ AS source_alias ]
ON merge_condition
WHEN MATCHED [ AND condition ] THEN matched_action 
[ WHEN MATCHED [ AND condition ] THEN matched_action ]
[ WHEN NOT MATCHED [ AND condition ]  THEN not_matched_action ]

merge_condition = bool_expression

matched_action  =
  UPDATE SET column1 = expression1 [, column2 = expression2...]

not_matched_action  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (expression1 [, expression2 ...])


merge into h0 as target
using (
  select 1 as id, 'a1' as name, 10.0 as price
) source
on =
when matched then update set *
when not matched then insert *


Update the data in hudi according to the condition.


UPDATE [database_name.] table_name SET column1 = expression1 [,column2 = expression2...] [ WHERE bool_expression ]


update h0 set price = price + 10 where id > 100


Delete the data in hudi according to the condition.


DELETE FROM [database_name.] table_name  [ WHERE bool_expression ]


delete from h0 where id > 100


Convert other  file formats to hudi.


CONVERT TO ([database_name.] target_table_name | ‘target_file_path’)
FROM ( [database_name.] source_table_name | ‘source_file_path’ ) 


convert to h0 from  tbl_parquet 
convert to h0 from  ‘/tmp/parquet/p0’
convert to ‘/tmp/hudi/h0’ from tbl_parquet
convert to ‘/tmp/hudi/h0’ from ‘/tmp/parquet/p0’


Hoodie has many cli commands, we can bring it to the sql. 


CLI_COMMAND [ (param_key1 = value1, param_key2 = value2...) ]

The CLI_COMMAND is the same with the hudi cli command and the param key should also keep the same.


commits show
commit showfiles (commit = ‘20210114221306’, limit = 10)
show rollbackssavepoint create (commit = ‘20210114221306’)

Implement on spark

A DDL/DML/CLI  statement goes through the following stages in spark sql:


In the sql parse stage, we will inject a HoodieSqlParser to spark sql which will parse our extended DDL/DML/CLI syntax to LogicalPlan. If the HoodieSqlParser failed to parse the sql

statement, spark will rout it to spark’s sql parser. So we just need to implement our extended syntax in the HoodieSqlParser.


In the resolution stage, some hoodie resolution rules will be injected to spark sql to resolve our extended LogicalPlan to the resolve plan which is a command plan for DDL/DML/CLI.


We may need to rewrite some of the built-in spark commands for hudi. For example, we need to rewrite the spark’s CreateDataSourceTableCommand to a hudi’s realization like CreateHoodieTableCommand which will do some initialization for the .hoodie directory and sync the metadata to the metastore.

So we will inject some rewrite rules to spark sql after the resolution.


The Command#run method will translate the logical plan to hoodie’s api call. For example the InsertIntoHudiTableCommand will translate to hoodie’s dataframe api for  insert data to hoodie.

Process for meta fields

Hoodie will append five meta fields to the head of the table schema. These are table property fields. Users may not use them in the insert statement, However spark sql will invalidate

the target table fields size  which contains the meta fields with the select statement output field size which will result to size not match exception. eg.  insert into h0 select 1, 'a1', 10 from s

will failed in the validate stage, Because the select statement does not have the meta fields while the h0 contains.

In order to solve this problem, we add a rewrite rule for the insert logical relation which will append the five meta fields to the head of the select projects.

MergeIntoCommand implement

The implementation for MergeIntoCommand is a litter complex than other commands. Look at the following statement.

merge into h0 as target
using (
  select * from s0
) as source
on =
when matched and delete_flag % 10 != 0 then update set id =, name =, price = source.price + target.source,
when matched and delete_flag % 10 = 0 then delete
when not matched and delete_flag % 10 != 0 then insert (id,name,price) values(id, name, price + 1)

There is a constraint for the Merge ON condition, It must contain the rowKey equal expression. So we can use the hoodie index to speed the update & delete. 

There are three write operations in the MergeIntoCommand: UPDATE, DELTE and INSERT.  We combine the three operators together with one hudi upsert write operator. We implement a ExpressionPayload which will execute the update & insert & delete expression and compute the result record to write. Here is main code for this:

class ExpressionPayload extend BaseAvroPayload {
    // do update
	override def combineAndGetUpdateValue(targetRecord: IndexedRecord, schema: Schema, properties: Properties): Option[IndexedRecord] = {
    	val sourceRecord = bytesToAvro(recordBytes, schema) // the incoming record
        // first test if the sourceRecord is match the update condition (e.g. delete_flag % 10 != 0 in the case)
		if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_UPDATE_CONDITION_EXPRESSION, sourceRecord) {
			// get the update expression(e.g. [,, source.price + target.source]) 
			// from the properties and convert it to spark Expression.
			val updateExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_UPDATE_EXPRESSION))
			// doCodeGen for expression
     		val expressionEvaluator = doCodeGen(updateExpression)
			// join the targetRecord with the sourceRecord, because the field referred 
        	// by expression come from both of them. 
        	val joinRecord = join(targetRecord, sourceRecord)
			// execute the expression to compute the resultRecord
        	val resultRecord = expressionEvaluator.eval(joinRecord)
        	return Option.of(resultRecord)
      // secondly test if match the delete condition
	  if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_DELETE_CONDITION, sourceRecord) {
		return Option.empty // An empty record means delete in HoodieMergeHandle
      // if no condition matched,return a IGNORED_RECORD which will be ignored by the HoodieMergeHandle.
      return IGNORED_RECORD.

  // do insert
  override def getInsertValue(schema: Schema, properties: Properties): Option[IndexedRecord] = {
	val sourceRecord = bytesToAvro(recordBytes, schema)
	// first test if the sourceRecord is match the insert condition (e.g. delete_flag % 10 != 0 in the case)
    if (matchCondition(properties, HoodiePayloadProps.PAYLOAD_INSERT_CONDITION_EXPRESSION, sourceRecord) {
		// get the insert expression(e.g. [id, name, price + 1]) from the properties and convert it to spark Expression.
		val insertExpression = toExpression(properties.get(HoodiePayloadProps.PAYLOAD_INSERT_EXPRESSION))
		// doCodeGen for expression
		val expressionEvaluator = doCodeGen(insertExpression)
		// execute the expression to compute the resultRecord
    	val resultRecord = expressionEvaluator.eval(sourceRecord)
		return Option.of(resultRecord)
	return Option.empty

  // test if the sourceRecord match the insert or update condition.
  def matchCondition(properties: Properties, conditionExpressionKey, sourceRecord: IndexedRecord): Boolean = {
     val conditionExpression = toExpression(properties.get(conditionExpressionKey))
     val conditionExpressionEvaluator = doCodeGen(conditionExpression)
     return conditionExpressionEvaluator.eval(sourceRecord)



The HoodieSparkSessionExtension is the main entrance for the extension which will inject our sql parser、resolution rules and rewrite rules to the spark sql. It just like the follow code:

class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) {
  // Inject the hoodie sql parser
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectParser { (session, parser) =>
      new HoodieSqlParser(session, parser)
    // Inject the hoodie resolution rule

    HoodieAnalyzer.customResolutionRules().foreach { rule =>
      extensions.injectResolutionRule { session =>
    // Inject the post hoc rule to rewrite the resolved plan
    // (e.g. rewrite the CreateDataSourceTableCommand).
    HoodieAnalyzer.customPostHocResolutionRules().foreach { rule =>
      extensions.injectPostHocResolutionRule { session =>

spark can use the extension when creating sparksession.

val spark = SparkSession



      .withExtensions(new HoodieSparkSessionExtension)


implementation plan

1、Implement a sql parser by Antlr4 to extend most the sql syntax which include the DDL and DML. There would be a JIRA for this.

2、Implement the resolution rule and rewrite rule for  each DDL and DML logical plan.And translate the logical plan to hoodie’s api call. Maybe there is a JIRA for each DDL and DML statement. 

After this stage is finished, we can use sql to create tables and  insert/update data to hoodie.

3、In the last, we extend syntax for the rest of the Hoodie CLI Commands.

  • No labels


  1. Does this proposal only supports Spark engine ? In our original discussion, we propose to use the Apache Calcite as the SQL parser layer so that it is orthorhombic and can be expanded freely.

  2. cc Udit Mehrotra please loop in dongwook here, since he has some working prototype as well. 

    We can combine efforts and hopefully make more magic like how we collaborated on RFC-18/RFC-19

  3. Danny Chen  Been very caught with the release work past few weeks. Will take a more holistic look into all the great proposals/suggestions you have and get back early next week. Thanks for your patience! 

  4.    Hi Danny Chen, I agree that we should have a common sql layer using calcite. But It is also necessary to keep separate implementations for each engine as users have different engine preferences. They may need to use spark or flink alone to process hudi data . So spark & flink should both have the ability to support sql for hudi.

      One the other hand, The SQL implementations vary widely between spark & flink.  Especially, spark has its own SQL parsing framework and sql syntax which not using calcite.  So spark should keep the original implementation by spark session extension.

      On the basis above,The common sql layer can do some common sql parser and rout it to the engine to process, spark or flink.  I think it will be a separate module. This may be a complicated job as the difference between spark & flink. So In the early, it can be a thin layer which just do the  engine independent processing and rout the other sql to the engine. In the long term, It can be more and more thick.

    Finally, here is a summary of my views:

    1、Keep separate implementations for spark using the spark session extension.

    2、Keep separate implementations for the flink engine.

    3、Building a common sql layer to do the common sql parser and gradually improve.

  5. Hi, everyone. We have supported SparkSQL for Hudi at internal version. The following is our design document:

    In this solution, SparkSQL Extension is used to support DDL and DML. The difference between this solution and Zhiwei Peng’s is that spark datasource V2 APIs are used.

    We think that the long-term planning of the Hudi community is to implement a common SQL parsing layer. Therefore, it is not recommended to introduce the Spark Antlr4 parser. The introduction of two types of parsers will result in challenges in maintaining syntax consistency and increase subsequent maintenance costs. However, using spark datasource V2 APIs, we do not need to introduce new parsers. Instead, we only need to implement the catalog interface of Hudi. This is also in the direction of the community evolution to spark datasource V2. For example, the Hudi community is implementing Hudi-893 (Add spark datasource V2 reader support for Hudi tables).

    In addition, this solution also takes into account the implementation of V1 and V2 to meet the requirements of existing Spark 2.X users.

  6. Danny Chen Wanted to get your thoughts on Zhiwei Peng's comments  as well. If Flink already uses Calcite, it may be good to just find good ways to integrate Hudi underneath as the storage/table management layer for now? 

    I am  +1 on having first class support for both spark (using teh spark sql dialect and abstractions) and Flink (and indirectly get Calcite SQL on Hudi tables that way). 

    Both of these seem like what we can first attempt, before foraying into a full blown calcite SQL implementation. Wdyt? 

  7. Zhiwei Peng, do you have some thoughts on Jack's proposal as well? 

    IIUC it argues for a common parser, vs this RFC sticks closers to Spark.

    Personally, I feel Spark SQL will be around and some users do switch from delta to hudi etc. 

    So, supporting SparkSQL dialect as-is can be a good step IMO. 

    love to hear thoughts. 

  8. Dongwook can you also please chime in? you have some implementation along these already?

    Zhiwei Pengis there a PoC implementation somewhere that I can take a look at?

  9. Hi Vinoth Chandar, The mainly difference between this RFC with @Jack's is that this RFC's implementation is based on DataSource V1, while Jack's implementation is based on DataSource V2.  We can extract the common part between V1 and V2 and support for both of them.

    I am sorting out the internal code now and will post some of our implementation on GitHub soon.

  10. Thanks, I ll wait for the GH impl. 

  11. Thanks Zhiwei Peng for the RFC, it looks good to me, I have few questions.

    1. Isn't CONVERT same feature as "CREATE TABLE hudi_table AS SELECT from another_format_table" ?
    2. From my test, except CONVERT, there was no need to add Antlr4 parser for Spark 3. Is the reason why you need to add Antlr4 parser mainly for Spark 2 and CONVERT Sql?
    3. I wonder how would you handle Hudi configurations? such as partition key, record key, combine key etc, configurations in SQL?

    Just one note from my investigation is the need for adding Antl4 parser is mainly whether specific SQL query that want add is supported by Spark or not than DataSource uses DataSource API v1 or v2. 

    Although it was just POC, I was able to support CREATE/UPDATE/INSERT/DELETE/MERGE for Spark 3 using DataSource V1 without adding Antl4 parser extension.

  12. Hi Vinoth Chandar Dongwook, Thanks for your attention. Here is the  part of the implement for spark sql support on hudi:

    Currently I have published DDL、MERGE INTO 、CTAS、UPDATE 、DELETE and INSERT INTO based on DataSource V1  in this submit. Any suggestion is welcome~

    Thanks Dongwook for your good questions. 

    Isn't CONVERT same feature as "CREATE TABLE hudi_table AS SELECT from another_format_table" ?

    Yes ,Convert and CTAS are somewhat similar,But it is simpler than CTAS which only do the format convert. Users

    can use it for data format converting and do not need to write the sql query like CTAS.

    From my test, except CONVERT, there was no need to add Antlr4 parser for Spark 3. Is the reason why you need to add Antlr4 parser mainly for Spark 2 and CONVERT Sql?

    Except for CONVERT, we can also bring the Hudi CLI Command to the sql. And on the other hand, Defining the PK is not supported for CREATE TABLE for spark 3, we should also need extend the syntax. So I think add Antlr4 is need for both spark2 and spark3.

    I wonder how would you handle Hudi configurations? such as partition key, record key, combine key etc, configurations in SQL?

    Yes,It is a good question. Currently I defined the primary key in the CREATE TABLE which is used as the row key. you can see the DDL define in this doc. And I use the partition columns defined in the table as the partition fields (Enable the URL_ENCODE_PARTITIONING_OPT_KEY for patition path by default for sql). 

    For combine key, users can define it in the table options. For other configurations, users can define it by the Set Command or define it in the table options. Your can see the test code in HoodieSqlTest .

    1. Zhiwei Peng Reviewing this week. Apologies for the delay. 

  13. Hi Zhiwei Peng  Vinoth Chandar Dongwook . Here is our base implement for spark sql support for dataSource V2:

    may be we can extract the common parts between V1 and V2 to support both of them.

    1. Jack  Zhiwei Peng Happy to facilitate this. Its great to see lots of ideas around this. Pulling everything into a feature branch and driving the next steps in an umbrella ticket worked well for RFC-15,18,19 work in 0.7. We can do a similar one here. thoughts? 

      Dongwook do you have an ETA on getting your implementation also pushed to a branch. It will really help facilitate the next steps from here. 

      Can't stress how awesome it is to see lot of us pulling together. 

      1. Hi Vinoth Chandar Jack Dongwook Welcome to see the various options on the table. We can learn from each other's implementation scheme and make it better. 

        I have update the code and publish most of  our implementation including DDL、MERGE INTO、DELETE、UPDATE、CTAS and INSERT.

        Any suggestion is welcome~

  14. Thanks JackZhiwei Peng for the answers and your POC links, I've checked both, and overall both look good to me for starting, I think we just need to agree few things in detail and split the tasks for next step.

    Here are things I'd like to hear your opinions.

    • Custom SQL Keywords and parser

    For SQL compatibility perspective, I think it would be less desirable to add custom SQL keywords if it's possible, also from priority perspective, we can always add more custom SQL keywords when feature demand is increased.

    Like CONVERT, OPTIONS and PRIMARY KEY, we should try to use existing equivalents like CREATE TABLE AS SELECT and TBLPROPERTIES.

    Since CATS needs to be supported anyway, I find CONVERT is less useful.

    Probably Hudi's BOOTSTRAP feature would be one use case for CONVERT sql, since it's equivalent feature of Delta Lake's in-place convert feature, however we could simple support BOOTSTRAP as configuration as SET "hoodie.datasource.write.operation" = "bootstrap" with CATS or CATS when the source table and target table is the same like

    CREATE TABLE table1 SELECT * FROM table1;

    Also for PRIMARY KEY keywords, it's nice to have but since customer would need to specify other properties like combine key, table type etc anyway through TBLPROPERTIES, it doesn't seem  would make big difference in terms of user experience. 

    Although I don't object to add CONVERT, PRIMARY KEY or any other custom keywords, in my opinion it would have less priority if it needs to be added.

    • Configurations

    My team mate is working separately for mainly 2 changes, although it's still in proposal state.

    1. Externalize all Hudi configurations (separate configuration file)
    2. Save table related properties into file.

    For SQL, all table related properties should be specified through TBLPROPERTIES when a table is created, and it will be saved in metastore as well as file, when metastore isn't available like in DataFrame, file should be the source.

    And any other global or session related configurations should be configured as SET SQL or external configuration file, so SQL doesn't need specify OPTIONS keyword for every query like Hive or any other SQL support engine does.

    So, for Spark 2, we need to add custom parser, however IMO, for Spark 3, it would be optional for now.

    • Regarding DataSource V2 vs V1 

    In general I agree with Jack, Hudi should support DataSource V2, however for the sake of the scope and priority, I think it would more make sense to start with as it is today as DataSource V1 only and when HUDI-30 is implemented, Spark SQL also can update to use DataSource V2.

    • Regarding record key constraint

    There is a constraint for the Merge ON condition, It must contain the rowKey equal expression. So we can use the hoodie index to speed the update & delete

    I had the same concern regarding performance when record keys aren't specified from predicates, from my understanding, the performance concern isn't only for MERGE, other DML like DELETE, UPDATE, if predicates don't have record/partition keys, internally we would need to find record/partition keys first and pass it to Hudi, which could cause performance issue.

    I've been discussed this concern with other my team members internally and mostly they think we should support without this record keys constraint, instead it would be enough to warn user about possible performance issue when record keys aren't specified in predicates, and RFC-15 would improve the performance for this use case, and when it's available, warning could be removed.

    Please let me know what do you think and correct me if I misunderstood anything, also I wonder whether it's better to have online meeting to decide the detail and split the task for next steps.

  15. Hi @Dongwook, thanks for your reply~ Here is my options:

    Custom SQL Keywords and parser

    I agree that we can reduce the priority to support custom sql parser for spark3.  In our first version, the PRIMARY KEY can defined in the table options and use CTAS instead of the Convert. Maybe in the long term, we can extend the sql parser.


    It is all right to save the table related properties (e.g. primary keys, partition columns) to the and meta store. But the runtime properties(e.g. insert parallelism) should exclude.

    And I also have a suggestion that we can support short name properties for sql. For example, in HoodieSqlTest , I use "type = 'cow'" to define COW table type  in table options and matching the short name property to hoodie's long name property. It will reduce the use cost for users by short name.

    Regarding DataSource V2 vs V1 

    Agree with you that starting from V1 and update to V2 after  HUDI-30 finished. Because there are a lot of work to do for updating to DataSource V2. We can do it in another thread.

    Regarding record key constraint

    Maybe we can support two kind of implementation for MergeInto. One for the case that On-Condition contains rowKey. It is the implementation mentioned in this article. Another one is for the case that On-Condition does not contains rowKey. We need to use join to implement the merge logical.

    For other DDL like DELTE and UPDATE, we can get the primary keys from the table properties in the meta store or the which we saved when we create the hudi table. See the Implementation at DeleteHoodieTableCommand and UpdateHoodieTableCommand.

  16. Hi Dongwook , thanks for your reply。

    I basically agree with you。 Here are some of my views:

    • Configurations

    it is necessary to save some useful properties into  and for some useful runtime properties, we can use set statement to set them: set xxxx=xxx

    • DataSource V1 and V2

    DataSource V1 first is ok to me.   For update/delete/merge grammer implementation,  spark2 need to add extra sql parser; we should to keep this implementation is consistent with spark3

    • keygenerator constraint

    maybe we can use ComplexKeyGenerator as default keyGenerator in sql, since ComplexKeyGenerator is basically consistent with hive's behavior。

    • record key constraint

    We should not have constraint for the merge/update/delete condition, and i agree with you that we can support two kind of implementation for MergeInto。 In fact  In our implementation, we implement this feature two way: one is doMergeDirectly(call upsert directly), the other one is doMerge by join  pls see     but i think your idea is more common, i will update my code to support your idea .

  17. Thanks  Zhiwei Peng and Jack for the answers.

    What would be the best way to reach out to you? do you use Slack? I wonder we can chat via Hudi Slack Group

  18. Hi Dongwook Jack, I think we should take this  one step further as we have spend so much time on the discuss.  So I plan to submit  some PRs over the next few weeks. Early on I will submit an implementation of  DDL、INSERT INTO、CTAS and MERGE INTO based on datasource v1 including the sql parser and runtime. The runtime needs to consider reuse of v2 as much as possible. After this finished, we will provide a first version for the sql support for spark. And in the next we can implement support for V2 and other features together.  

  19. Zhiwei Peng Dongwook Jack  Thanks for all the great discussion.  We can make all three of you proposers for the RFC. 

    I see the following items, have been agreed upon (mostly) 

    1) Datasource V1 or V2 : We will stick to v1 for now. v2 upgrade will be a separate project. 

    2) Antlr4/Spark2 or stick to Spark3 : Stick to spark3 keywords for now and avoid custom keywords. Again for now,. We can use TBLPROPERTIES for passing all configs. 

    3) CONVERT vs CTAS: We can just stick to CTAS for now. 

    4) MERGE INTO, using _hoodie_record_key or non-key field : I think we will have a PR for range indexes etc in the next few weeks (what I know as of now). But starting with record key based merges/deletes is okay. 

    If everyone agrees, I would like to begin with 1 concrete PR that all of us can review, land quickly and iterate on. 

    Could we try and have consensus on who could take that first concrete step? 

    Another person could start defining the follow on sub tasks under the JIRA we have for this feature? Can you please add those details to the RFC as well. 

    Please let me know how I can help here. I want to untangle us and start moving ahead. 

  20. Hi Vinoth Chandar , thanks for your suggestion. I agree with your solution for this. 
    I've been concerned about this RFC for a long time and I really want this RPC to land as soon as possible. So could I take the first PR for this?  And others can define the follow sub tasks under the JIRA. so that we can land this quickly~

  21.  Vinoth Chandar Thanks for all the discussion.  i agree with your solution for this.

    maybe we can start with CTAS as the first pr Zhiwei Peng .

  22. Zhiwei Peng yes please take the lead for the first PR, with the lowest common set of functionality we agree on. What I mentioned above is fine by me. 

    HUDI-1658 is the JIRA for this work , correct? 

    Jack any reason you think we should limit to CTAS for now? 

    I will add all three as proposers for the RFC (if you don't mind) and facilitate the sub tasks.

      JackDongwook can you both please add every other open item/follow up as subtasks there. We can resolve one by one using comments there. 

  23. Vinoth Chandar Yes, HUDI-1658 is the JIRA for this RFC. And I am working on the HUDI-1659 for the first PR.

  24. I just turned that into an umbrella JIRA. May be just create a sub task under that for the first PR? 

  25. Ok ,I have create a sub task under the HUDI-1658 now! And I will provide the first PR with the lowest common set of functionality we agree on.

  26. sorry for later, i will try to add some subtasks for HUDI-1658.    Vinoth Chandar   my only question is that:now we have two ways to create table, one is  create table using hive style(stored as xxx),the other is spark style(using xxx), which way  is suitable or both?.   

  27. Hi all, The first pr has submit at . Please take a review, thanks~

  28. Vinoth Chandar Zhiwei Peng Dongwook ,   I have added some subtask for  HUDI-1658, please have a look , thanks.