The Div macro wraps content in a div tag with optional class and styles. This allows us to use macros such as the Style macro below.

Proposers

Approvers

Status

Current state


Current State

(tick)





Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Hudi currently supports limited schema evolution, which cannot meet the needs of actual scenarios. The purpose of this design is to support Hudi's evolution capability more comprehensively.

Background

With the continuous operation of the business, business adjustment or business system reconstruction will generally be carried out, and these operations may adjust the business system database, in the traditional data lake storage table schema adjustment is not easy.

Hudi schema evolution is in-place table evolution,does not require costly distractions,like rebuild table or rewrite new table.In addition, it can quickly adapt to the adjustment of the business system and write data into the data lake faster

Currently, HUDI only supports implicit schema sequential columns, which is very limited; So this design adds the ability of full Schema Evolution to HUDI.

Features

This design supports full schema evolutionthe following schema evolution features:

Support changes

These operations are table schema metadata changes,the data files does not need to be rewrite

Not supports

Note: Partition evolution is not included in this design, Partition evolution will come soon after schema evolution.

Overall design

Based on the existing implementation, add table schema metadata, store business field information in the table metadata, read and write operations based on it.The existing extrametadata in commitfile is not adjusted for compatibility with existing operation interfaces.

Schema metadata format

Table schema metadata examples are as follows:

Schema Metadata Json element definition

 

Table shcema metadata's multiple versions are available by "version-id"

 

DDL operations on metadata

Schema storage policy


Currently, each write operation has a commit file and is saved in the./hoodie directory. Commit files such as commit, Deltacommit, and replacecommit contain AVRO Schema information generated by the write operation. This information is used to represent the current version of table schema.

In this design, the schema information stored in the previous COMMIT file will be changed to support multiple versions. The commit file will save all previous versions of the schema.In addition, due to the current clean mechanism, the historical COMMIT information is cleaned and the historical Schema information is also deleted. This operation causes historical data not to be read correctly and the DDL's historical operations not to be traceable

There may be a large number of historical DDL operations, and each commit file may contain more and more historical versions of schemas. This requires a cleanup mechanism to set the number of schema historical versions retained. If the schema changes occur again, the historical version beyond the policy will not be recorded in the commit file.The cleaning policy is specific to tables, and each table can set a different count of version retained.

Evolution of the data file:

The data file field name is mapped to the Schema Metada field ID.

 

The impact of evolution:

solution scheme

According to the above ideas, the detailed design is divided into the following parts

 

DDL operation process


DDL operations mainly include create table and ALTER Table, but partition changes are not included in this design.

CREATE Table

For the CREATE TABLE operation, directly convert the table schema into text information and save it to the COMMIT file. Only one version of the schema is retained because there is no historical version of the schema.

Create table is an initialization operation, so the schema version ID and commit file name are set to "00000000000000".

Alter table

The DDL execution process is shown in the figure above. After the DDL statements pass through the Spark engine, they are passed to the HUDI kernel. The HUDI kernel classifies THE DDL operations as schema changes. Then, the classified changes are transferred to the Schema Evolvter module, which applies different types of schema changes to specific schema changes, and finally persi

sts the schema to the HDFS/ object storage. The key point of the whole process is the transformation process of Schema Evolvter and Schema Evolvter as follows:

Change column incompatibility handling policy:

The current DDL operation is required to be compatible with the operation that changes the column type (int can be converted to long, and vice versa). If incompatibility occurs, it will fail directly by default. In order to make the type modification operation more flexible, this design also provides another scheme as an alternative, that is, when incompatible situation occurs, directly pull up a new job to rewrite the full table, of course, this cost is very high, so the default scheme is still incompatible and directly failed.

Data write process

The overall writing process design is shown in the figure above:

The steps to focus on are:

For normal write operations, there will be no inconsistency between the written data and the current schema, and there will be no schemashcema change. The schema of the commit file can inherit the last submitted schema; However, Hudi itself supports the ability of writing to implicitly add columns. In this scenario, schema changes will occur in the write operation. In this case, the schema saving process is the same as that after the DDL change operation, that is, the newly generated new version of the schema and the historical version of the schema will be saved.

Data Query process

Introduction to the overall process

The whole reading process is shown in the figure above:

Query method

Query schema constructcountsruct

Snapshot query

Get the latest schemashcema from the latest commit file

Incremental query

The versioned schema is obtained from the specified committime by incremental query, and then the query schema is obtained from the commit file

Read Optimized query

Get the schema version ID from the file name of the basefile,,and then the query schema is obtained from the commit file

Construction of readschema

The generation of read schema is related to the specific query engine, because different query engine file readers use different schemas. Here is an example of spark.

let's introduce the execution process in the figure above.

Firstly, prune the query schema according to the prune columns generated by query engine. Take the corresponding id in the pruned query schema one by one.

 

Contruction of pushed filters

Expand the pushed filters generated by the query engine. For each sub filter, such as EqualTo、EqualNullSafe、GreaterThan、GreaterThanOrEqual、LessThan、LessThanOrEqual、In、IsNull、IsNotNull、StringStartsWith、StringEndsWith、StringContains(exlude and、or、not), turn it into a new filter according to the process in Figure below。For the complex filters `and`, `or`, `not`; process the sub filters recursively as shown in figure below to generate new sub filters, and then recombine them into and, or, not filters.

DDL concurrent operations

Optimistic locking mechanism is adopted. DDL and write operations are directly executed. When the final commit is submitted, the lock is added。

The policy of checking whether the schema conflicts needs to be made in the form of a plug-in, which can be customized by users.

 

Various engine support capabilities

Some engines use hive meta store as metadata management, but hive meta does not support multiple versions, so these engines cannot support reading data from historical schema.

Old schema data compatibility

scene1: old hudi table will not do any schema change in the future

This scence is relatively simple, we can fallback to the original read/write logic of hudi.

scene2: do schema change on old hudi table.

schema evolution operation

if we do first schema change on a old hudi table. the first id-schema will be created

1) convert old hudi table's latest avro schema to id-schema as the first id-schema.

2) any schema change will directly applied to this first id-schema and saved with commit file

let's give a exmaple:

now rename operationTime to col1:


read operation:


once we have done schema change on old hudi table.  first id-schema will be created and all the old files are bound to the first id-schema. since old hudi table only support add/modify column type operation and avro/parquet support those change native. so use the first id-schema to represent the old data is completely fine.

now all files in hudi table are bound to the id-schema.  the query operation can refer chapter Data query process.

Let us follow the above example to give an explanation :

now old table exists two files.   one is old file which bound to the the first id-scheam. the other one is new file which writed after we do add column change and this file bound to the latest id-schema.

follow the step on chapter Data query process.

  1. when we read the old file, the first id-schema will be used as file-schema. lastest id-schema will be used as qurey-schema. then we use merge module to merge file-schema and query-schema to produce the final read-schema. once read-schema is produced, we can read the old files correctly.how to merge file-schema and query-schema pls see the chapter Data query process

   2. when we read the new file, the lastest id-schema will be used as file-schema and qurey-schema the remaining process are same as (1)

write operation:

now we already has id-schema. just see the chapter Data Write process 

Rollout/Adoption Plan

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>