Proposers
- scbai
- ???
Approvers
Status
Current state:
Current State | |
---|---|
UNDER DISCUSSION | |
IN PROGRESS | |
ABANDONED | |
COMPLETED | |
INACTIVE |
Discussion thread: here
JIRA: here
Released: <Hudi Version>
Abstract
Hudi currently supports limited schema evolution, which cannot meet the needs of actual scenarios. The purpose of this design is to support Hudi's evolution capability more comprehensively.
Background
With the continuous operation of the business, business adjustment or business system reconstruction will generally be carried out, and these operations may adjust the business system database, in the traditional data lake storage table schema adjustment is not easy.
Hudi schema evolution is in-place table evolution,does not require costly distractions,like rebuild table or rewrite new table.In addition, it can quickly adapt to the adjustment of the business system and write data into the data lake faster
Currently, HUDI only supports implicit schema sequential columns, which is very limited; So this design adds the ability of full Schema Evolution to HUDI.
Features
This design supports full schema evolutionthe following schema evolution features:
Support changes
- Add : add a new column to the table or to a nested struct
- Drop: remove an existing column from the table or a nested struct
- Rename: rename an existing column or field in a nested struct
- Update: widen the type of a column, struct field, map key, map value, or array element
- Reorder: change the order of columns or fields in a nested struct
These operations are table schema metadata changes,the data files does not need to be rewrite
Not supports
- map keys do not support adding or dropping struct fields that would change equality
Note: Partition evolution is not included in this design, Partition evolution will come soon after schema evolution.
Overall design
Based on the existing implementation, add table schema metadata, store business field information in the table metadata, read and write operations based on it.The existing extrametadata in commitfile is not adjusted for compatibility with existing operation interfaces.
Schema metadata format
Table schema metadata examples are as follows:
Schema Metadata Json element definition
- max-column-id:Maximum column ID value for the current table,When a new columns are added,the value +1
- type:schema type,default value is "struct"
- version-id:DDL operation commit time, which is consistent with commitfile
- fields:cloumn definintion
- id:Id of column,the id will be store as column in parquet.
- name:name of column,consistent with service system,Both DDL and DML through this field.
- type:data type of column
- required:Is NULL,if not
- default:This column is missing in new data and is written by default
Table shcema metadata's multiple versions are available by "version-id"
DDL operations on metadata
- Add: For Add operations that Add column types sequentially, Add column information append to the end of schema and assign a new ID. The ID value increases by +1. For the Add after/before operation, Add the column append to the corresponding position in the schema and give a new ID. The ID value is incremented by +1.
- Change : directly Change the name, attribute, and type ID of the field corresponding to the column in the Schema
- Reorder: adjusts the column position in the schema. Id is the same
- Delete: Deletes fields corresponding to columns in the Schema
Schema storage policy
Currently, each write operation has a commit file and is saved in the./hoodie directory. Commit files such as commit, Deltacommit, and replacecommit contain AVRO Schema information generated by the write operation. This information is used to represent the current version of table schema.
In this design, the schema information stored in the previous COMMIT file will be changed to support multiple versions. The commit file will save all previous versions of the schema.In addition, due to the current clean mechanism, the historical COMMIT information is cleaned and the historical Schema information is also deleted. This operation causes historical data not to be read correctly and the DDL's historical operations not to be traceable
There may be a large number of historical DDL operations, and each commit file may contain more and more historical versions of schemas. This requires a cleanup mechanism to set the number of schema historical versions retained. If the schema changes occur again, the historical version beyond the policy will not be recorded in the commit file.The cleaning policy is specific to tables, and each table can set a different count of version retained.
Evolution of the data file:
The data file field name is mapped to the Schema Metada field ID.
The impact of evolution:
- Write data:Through the metadata, the data is detected and written to the Datafile.
- Query Data:Obtain the schema of the corresponding version from the metadata and complete the read operation
- Table definition:DDL operations directly operate metadata, not data files.
solution scheme
According to the above ideas, the detailed design is divided into the following parts
- The Hudi kernel supports schema evolution
- Each engine adapter the schema evolution of the kernel
- Clustering, compaction, Clean, rollback Supports schema evolution
DDL operation process
DDL operations mainly include create table and ALTER Table, but partition changes are not included in this design.
CREATE Table
For the CREATE TABLE operation, directly convert the table schema into text information and save it to the COMMIT file. Only one version of the schema is retained because there is no historical version of the schema.
Create table is an initialization operation, so the schema version ID and commit file name are set to "00000000000000".
Alter table
The DDL execution process is shown in the figure above. After the DDL statements pass through the Spark engine, they are passed to the HUDI kernel. The HUDI kernel classifies THE DDL operations as schema changes. Then, the classified changes are transferred to the Schema Evolvter module, which applies different types of schema changes to specific schema changes, and finally persi
sts the schema to the HDFS/ object storage. The key point of the whole process is the transformation process of Schema Evolvter and Schema Evolvter as follows:
Change column incompatibility handling policy:
The current DDL operation is required to be compatible with the operation that changes the column type (int can be converted to long, and vice versa). If incompatibility occurs, it will fail directly by default. In order to make the type modification operation more flexible, this design also provides another scheme as an alternative, that is, when incompatible situation occurs, directly pull up a new job to rewrite the full table, of course, this cost is very high, so the default scheme is still incompatible and directly failed.
Data write process
The overall writing process design is shown in the figure above:
- Preparing data for writing
- Check the schema to which data is being written. If the schema is not compatible with the current table schema, stop writing
- Execute the original hudI write process (upsert, etc.) to write to the data file.
- The latest schema information is obtained from the previous commit file, combine with the current schema information, and then written to the current commit file
The steps to focus on are:
For normal write operations, there will be no inconsistency between the written data and the current schema, and there will be no schemashcema change. The schema of the commit file can inherit the last submitted schema; However, Hudi itself supports the ability of writing to implicitly add columns. In this scenario, schema changes will occur in the write operation. In this case, the schema saving process is the same as that after the DDL change operation, that is, the newly generated new version of the schema and the historical version of the schema will be saved.
Data Query process
Introduction to the overall process
The whole reading process is shown in the figure above:
- Step 1: judge the query method of data from SQL statements
- Step 2: give the query method to the scan factory module. The scan factory module has two core functions to obtain the table files to be scanned according to different query methods (the logical Hudi kernel has been implemented); Build a query schema for different query methods (refer to the construction process of query schema for detailed design)
Query method | Query schema constructcountsruct |
Snapshot query | Get the latest schemashcema from the latest commit file |
Incremental query | The versioned schema is obtained from the specified committime by incremental query, and then the query schema is obtained from the commit file |
Read Optimized query | Get the schema version ID from the file name of the basefile,,and then the query schema is obtained from the commit file |
- Step 3: scan factory passes all the files involved in this query to the scanner module; Scanner builds a scan task for each file group; Scan task is responsible for reading specific data.
- Step4: scan task constructs the file schema of the file to be read for the file to be read. Here, the construction of file schema is similar to the process of finding schema in step 2. Use the committime of the file as the version ID to query the corresponding version of schema from the commit file.
- Step 5: scan task gives the file schema generated in step 4 to the merge schema module. The merge schema will merge the file schema and the query schema generated in step 2, generate the read schema according to the query engine, and push filters will also be built to push down to the data source side to filter data.
Construction of readschema
The generation of read schema is related to the specific query engine, because different query engine file readers use different schemas. Here is an example of spark.
let's introduce the execution process in the figure above.
Firstly, prune the query schema according to the prune columns generated by query engine. Take the corresponding id in the pruned query schema one by one.
- The first ID is 5. It is found that 5 does not exist in the file schema, but the name corresponding to id = 5 exists in the file schema. This indicates that the ID column is deleted first and then added back; However, these two columns are no longer the same column, so the data of this column in the file cannot be read out and should be assigned null. Therefore, this place should add a suffix to the column name corresponding to the ID to prevent spark from reading it out
- The second id is 2. There is a column named data in the file schema with id 2. In the query schema, id = 2 corresponds to the column named data1. This indicates that the rename column operation has occurred in the table since their IDs are the same. In this case, the data of this column should be read out theoretically, because the user only renames the column. Why do you choose to transfer data to spark schema instead of data1? The reason is that the column name stored in file (parquet as an example) is data. You can't read data with data1, which is wrong.
- The third column with id=3. The name of the third column with ID 3 is the same in file schema and query schema, and the order is the same, indicating that there is no change in this column. Just insert it into the schema of spark
- The fourth ID is 4. This ID does not exist in file schema, and the column name corresponding to this ID does not exist in file schema, so this column is a new column. Just insert the schema directly into spark. When spark reads it, it will be automatically given null
Contruction of pushed filters
Expand the pushed filters generated by the query engine. For each sub filter, such as EqualTo、EqualNullSafe、GreaterThan、GreaterThanOrEqual、LessThan、LessThanOrEqual、In、IsNull、IsNotNull、StringStartsWith、StringEndsWith、StringContains(exlude and、or、not), turn it into a new filter according to the process in Figure below。For the complex filters `and`, `or`, `not`; process the sub filters recursively as shown in figure below to generate new sub filters, and then recombine them into and, or, not filters.
DDL concurrent operations
- Multi DDL Concurrency: distributed locking processing
- DDL and query Concurrency: reusing Hudi's isolation mechanism
- DDL and write Concurrency
Optimistic locking mechanism is adopted. DDL and write operations are directly executed. When the final commit is submitted, the lock is added。
- The DDL operation can be submitted directly after obtaining the lock
- The write operation cannot be submitted directly after obtaining the lock,need to check whether the schema has changed.If the change is caused by the implicit addition of field capability, the write operation can be committed. If not, the operation is cancelled
The policy of checking whether the schema conflicts needs to be made in the form of a plug-in, which can be customized by users.
Various engine support capabilities
Some engines use hive meta store as metadata management, but hive meta does not support multiple versions, so these engines cannot support reading data from historical schema.
Old schema data compatibility
chapter Data query process
- chapter Data query process
Rollout/Adoption Plan
- <What impact (if any) will there be on existing users?>
- <If we are changing behavior how will we phase out the older behavior?>
- <If we need special migration tools, describe them here.>
- <When will we remove the existing behavior?>
Test Plan
<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>
8 Comments
Vinoth Chandar
scbai I don't think we are very apart. I am convinced of your high level approach for supporting existing table/old data. but the key question is whether we need to persist `ids` for each column to storage and change the data file layout.
I feel an approach where we deduce the IDs for on-the-fly with a list of schema versions and the diffs between them is preferrable for the following reasons, as long as we can achieve all the features.
I can imagine why other systems that don't have an ordered timeline would need ids, but we do already have this. So we can treat them as database scns and adopt a model more closely aligned with databases, than file management systems IMO.
Sagar SumitCan you please add concrete, clear examples of how we can resolve the different changes ADD, DROP, RENAME, REORDER with just a `TreeMap<String, HoodieSchema>` where each key is a commit time and `HoodieSchema` has the schema at that commit time and how it was derived from the entry before that? I think it's easier for us to align once we are there.
In the meantime, scbai let me know how married you are to the id persistence with data files and if there are other concrete examples for the alternative approach above.
Jack
Vinoth Chandar I think we almost reached an agreement. But there are some issues I need to explain.
1) i still cannot understand why we need to change data file layout.
we only want to introduce to id-schema as HoodieInternalSchema to track all the column changes, and save those HoodieInternalSchemas in commit files. We do not need to modify any parquet/log files.
I have communicated with Sager sumit. The only difference between our two solutions is whether we need to give each column an id. But I think there is a problem with this programming method.
i only think record id for column is more easier trace the changes of a column.
2) just ignore max-column-id which is only needed to procduce a next new id for add changes from a programming perspective. no lock needed,
If you think max-column-id is superfluous, we can even delete it directly.
3) i agree that resolve the different changes ADD, DROP, RENAME,with just a `TreeMap<String, HoodieSchema>` where each key is a commit time and `HoodieSchema` has the schema at that commit time .
In fact we also adopted this approach, the only different is that our schema has an id (HoodieSchema is a id-schema).
4) at last. i still cannot I understand, what are you worried about over there。 No need to inroduce a new internal Schema (used by schema evolution)for hoodie?
Vinoth Chandar
Jack Evolutionofthedatafile%EF%BC%9A this picture led me believe we are changing the actual parquet/log file schema.
That was my major concern, since I felt this can be done without that. Based on what you said, the ids seem like just a logical tracking done with the metadata. I am good with that.
>> we only want to introduce to id-schema as HoodieInternalSchema to track all the column changes, and save those HoodieInternalSchemas in commit files. We do not need to modify any parquet/log files.
Jack
sorry for leading your misunderstanding.
Sagar Sumit
Jack If I understand correctly, the `id` for each field in the schema will need to be mapped to the Parquet ID for the field for schema merging. Today, Hudi does not write any ID in the parquet file. So, we need to have ID for old Parquet files right?
Jack
Sagar Sumit . no need to be mapped to the Parquet ID for the field for schema merging. i known iceberg do that things, but hudi donot need that mapper. since we has the view of all the history schemas, we can get any commit files's id-schema directly.
Vinoth Chandar
More aspects we can flesh out in this RFC IMO
I suggest Sagar Sumitand you join forces on this. there is more work than there is hands for sure.
Jack
Vinoth Chandar Sagar Sumit Balaji Varadarajan i agree with that more aspects we can flesh out in this RFC IMO,
This is a big feature. We can work together to complete this。 i will add the intergration with spark mor/cow design later