The Div macro wraps content in a div tag with optional class and styles. This allows us to use macros such as the Style macro below.
Current state:
Current State | |
---|---|
Discussion thread: here
JIRA: here
Released: <Hudi Version>
Hudi currently supports limited schema evolution, which cannot meet the needs of actual scenarios. The purpose of this design is to support Hudi's evolution capability more comprehensively.
With the continuous operation of the business, business adjustment or business system reconstruction will generally be carried out, and these operations may adjust the business system database, in the traditional data lake storage table schema adjustment is not easy.
Hudi schema evolution is in-place table evolution,does not require costly distractions,like rebuild table or rewrite new table.In addition, it can quickly adapt to the adjustment of the business system and write data into the data lake faster
Currently, HUDI only supports implicit schema sequential columns, which is very limited; So this design adds the ability of full Schema Evolution to HUDI.
This design supports full schema evolutionthe following schema evolution features:
Support changes
These operations are table schema metadata changes,the data files does not need to be rewrite
Not supports
Note: Partition evolution is not included in this design, Partition evolution will come soon after schema evolution.
Based on the existing implementation, add table schema metadata, store business field information in the table metadata, read and write operations based on it.The existing extrametadata in commitfile is not adjusted for compatibility with existing operation interfaces.
Table schema metadata examples are as follows:
Schema Metadata Json element definition
Table shcema metadata's multiple versions are available by "version-id"
DDL operations on metadata
Currently, each write operation has a commit file and is saved in the./hoodie directory. Commit files such as commit, Deltacommit, and replacecommit contain AVRO Schema information generated by the write operation. This information is used to represent the current version of table schema.
In this design, the schema information stored in the previous COMMIT file will be changed to support multiple versions. The commit file will save all previous versions of the schema.In addition, due to the current clean mechanism, the historical COMMIT information is cleaned and the historical Schema information is also deleted. This operation causes historical data not to be read correctly and the DDL's historical operations not to be traceable
There may be a large number of historical DDL operations, and each commit file may contain more and more historical versions of schemas. This requires a cleanup mechanism to set the number of schema historical versions retained. If the schema changes occur again, the historical version beyond the policy will not be recorded in the commit file.The cleaning policy is specific to tables, and each table can set a different count of version retained.
The data file field name is mapped to the Schema Metada field ID.
The impact of evolution:
According to the above ideas, the detailed design is divided into the following parts
DDL operations mainly include create table and ALTER Table, but partition changes are not included in this design.
For the CREATE TABLE operation, directly convert the table schema into text information and save it to the COMMIT file. Only one version of the schema is retained because there is no historical version of the schema.
Create table is an initialization operation, so the schema version ID and commit file name are set to "00000000000000".
The DDL execution process is shown in the figure above. After the DDL statements pass through the Spark engine, they are passed to the HUDI kernel. The HUDI kernel classifies THE DDL operations as schema changes. Then, the classified changes are transferred to the Schema Evolvter module, which applies different types of schema changes to specific schema changes, and finally persi
sts the schema to the HDFS/ object storage. The key point of the whole process is the transformation process of Schema Evolvter and Schema Evolvter as follows:
Change column incompatibility handling policy:
The current DDL operation is required to be compatible with the operation that changes the column type (int can be converted to long, and vice versa). If incompatibility occurs, it will fail directly by default. In order to make the type modification operation more flexible, this design also provides another scheme as an alternative, that is, when incompatible situation occurs, directly pull up a new job to rewrite the full table, of course, this cost is very high, so the default scheme is still incompatible and directly failed.
The overall writing process design is shown in the figure above:
The steps to focus on are:
For normal write operations, there will be no inconsistency between the written data and the current schema, and there will be no schemashcema change. The schema of the commit file can inherit the last submitted schema; However, Hudi itself supports the ability of writing to implicitly add columns. In this scenario, schema changes will occur in the write operation. In this case, the schema saving process is the same as that after the DDL change operation, that is, the newly generated new version of the schema and the historical version of the schema will be saved.
The whole reading process is shown in the figure above:
Query method | Query schema constructcountsruct |
Snapshot query | Get the latest schemashcema from the latest commit file |
Incremental query | The versioned schema is obtained from the specified committime by incremental query, and then the query schema is obtained from the commit file |
Read Optimized query | Get the schema version ID from the file name of the basefile,,and then the query schema is obtained from the commit file |
The generation of read schema is related to the specific query engine, because different query engine file readers use different schemas. Here is an example of spark.
let's introduce the execution process in the figure above.
Firstly, prune the query schema according to the prune columns generated by query engine. Take the corresponding id in the pruned query schema one by one.
Expand the pushed filters generated by the query engine. For each sub filter, such as EqualTo、EqualNullSafe、GreaterThan、GreaterThanOrEqual、LessThan、LessThanOrEqual、In、IsNull、IsNotNull、StringStartsWith、StringEndsWith、StringContains(exlude and、or、not), turn it into a new filter according to the process in Figure below。For the complex filters `and`, `or`, `not`; process the sub filters recursively as shown in figure below to generate new sub filters, and then recombine them into and, or, not filters.
Optimistic locking mechanism is adopted. DDL and write operations are directly executed. When the final commit is submitted, the lock is added。
The policy of checking whether the schema conflicts needs to be made in the form of a plug-in, which can be customized by users.
Some engines use hive meta store as metadata management, but hive meta does not support multiple versions, so these engines cannot support reading data from historical schema.
chapter Data query process
<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>