Abstract
Currently, Hudi is widely used as a table format in the data warehouse. There is a lack of central metastore server to manage the metadata of data lake table. Hive metastore as a commonly used catalog service in the data warehouse cannot store the unique metadata like timeline of the hudi table.
The proposal is to implement an unified metadata management system called hudi metastore server to store the metadata of the hudi table, and be compatible with hive metastore so that other engines can access it without any changes.
Backgroud
How Hudi metadata is stored
The metadata of hudi are table location and schema, timeline generated by instants, metadata of each commit / instant, which records files created / updated, new records num and so on in this commit. Besides, the information of files in a hudi table is also a part of hudi metadata.
Different from instant or schema recorded by a separate file that is stored under `${tablelocation}/.hoodie` on the HDFS or object store, files info are managed by the HDFS directly. Hudi gets all files of a table by file listing. File listing is a costly operation and its performance is limited by namenode. In addition, there will be a few invalid files on the file system, which are created by spark speculative tasks(for example) and are not deleted successfully. Getting files by listing will result in inconsistency, so hudi has to store the valid files from each commit metadata, the metadata about files is usually referred to snapshot.
RFC-15 metadata table is a proposal that can solve these problems. However, it only manages the metadata of one table. There is a lack of a unified view.
The integration of Hive metastore and Hudi metadata lacks a single source of truth.
Hive metastore server is widely used as a metadata center in the data warehouse. It stores the metadata for hive tables like their schema, location and partitions. Currently, almost all of the storage or computing engines support registering table information to it, discovering and retrieving metadata from it. It seems that hive metastore has become a standard in the data warehouse.
Different from the traditional table format like hive table, the data lake table not only has schema, partitions and other hive metadata, but also has timeline, snapshot which is unconventional. Hence, the metadata of data lake cannot be managed by HMS directly.
Hudi just syncs the schema and partitions to HMS by now, and other metadata still stores at HDFS or object store. Metadata synchronization between different metadata management systems will result in inconsistency.
Overview
The hudi metastore server is for metadata management of the data lake table, to support metadata persistency, efficient metadata access and other extensions for data lake. The metadata server managed includes the information of databases and tables, partitions, schemas, instants, instants' meta and files' meta.
The metastore server has two main components: service and storage. The storage is for metadata persistency and the service is to receive the get / put requests from client and return / store the processing result after doing some logical operations on metadata.
The hudi metastore server is / has
A metastore server for data lake
Different from the traditional table format, the metadata of the data lake has timeline and snapshot concepts, in addition to schema and partitions.
The metastore server is an unified metadata management system for data lake table.
Pluggable storage
The storage is only responsible for metadata presistency. Therefore, it's doesn't matter what the storage engine is used to store the data, it can be a RDBMS, kv system or file system.
Easy to be expanded
The service is stateless, so it can be scaled horizontally to support higher qps. The storage can be split vertically to store more data.
Compatible with multiple computing engines
The server has an adapter to be compatible with hive metastore server.
Design
This part has four sections: what the service does, what and how the metadata stores, how the service interacts with the storage when reading and writing a hudi table with an example, and the extension of the metastore server which will be implemented in the next version.
Service
Service is to receive requests from clients and return results according to the metadata in the storage combined with some processing logic. According to the functional division, the service consists of four parts:
table service
is for table related request. To client, it exposes API about database and table CRUD.
partition service
is for partition related requests. To client, it exposes API about CRUD:
support multiple ways of reading, like checking the partition's existence, getting partition info, getting partitions statisfys a specific condition(partition pruning).
creating or updating API cannot be invoked directly, only a new commit completion can trigger it.
dropping a partition not only deletes the partition and files at metadata level, but also triggers a clean action to do the physical clean that deletes the data on the file system.
timeline service
is for timeline related request. The basic functions are the same as the `HoodieActiveTimeline`.
To client, it exposes API that
the instant CRUD
get the metadata of an instant
snapshot service
is for file level related request. The old way is that a write client gets all files under the path by file listing, and then builds the file system view by organizing the files as file slices in file groups. After that, get the latest file slice in each file group to read and write. Snapshot service just gives the client the result of the last step.
To client, it exposes API that
get the latest snapshot of a partition without multiple file version
get the incremental files after a specified timestamp, for incremental reading
get all files of a partition, the same as the files under the filesystem, which is used to clean
update the snapshot when a new commit completes, it cannot be invoked by client directly.
The specific interface is defined in the `Alpha Version` part.
Storage
According to the type of the metadata stored, the storage is divided into four parts: table, partition, timeline, snapshot. Table consists of the metadata of database, table, schema. Partition is just partition metadata. Timeline contains instant and the metadata of each instant. Snapshot has file statuses.
Storage Schema
Table
db
unique key: name
name
type
comment
db_id
bigint
auto_increment
desc
string
database description
location_uri
string
database storage path
name
string
database name
owner_name
string
database owner
owner_type
string
database owner type
db_params
unique key: param_key, db_id
name
type
comment
db_id
bigint
auto_increment
param_key
string
database parameter key
param_value
string
database parameter value
tbl
unique key: name, db_id
name
type
comment
tbl_id
bigint
auto_increment
db_id
bigint
database the table belongs to
name
string
table name
create_time
timestamp
table created time
owner
string
the table owner
location
string
table location
tbl_params
is used to record table level statistics like file num, total size.
unique key: param_key, tbl_id
name
type
comment
tbl_id
bigint
auto_increment
param_key
string
table parameter key
param_value
string
table parameter value
schema
unique key: name, type, version, tbl_id
name
type
comment
col_id
bigint
auto_increment
tbl_id
bigint
table identifier
version
bigint
a completed commit identifier
name
string
partition parameter key
type
string
partition parameter value
is_partition_col
tinyint
whether the column is a partition column
comment
string
column description
Partition
partitions
unique key: part_name, tbl_id
name
type
comment
part_id
bigint
auto_increment
tbl_id
bigint
table the partition belongs to
part_name
string
partition name
create_time
timestamp
partition created time
update_time
timestamp
partition updated time
is_deleted
tinyint
whether the partition is deleted
partition_parmas
unique key: param_key, part_id
name
type
comment
part_id
bigint
partition identifier
tbl_id
bigint
table the partition belongs to
param_key
string
partition parameter key
param_value
string
partition parameter value
partition_key_val
is used to support partition pruning.
unique key: part_key_name, part_id
name
type
comment
part_id
bigint
partition identifier
tbl_id
bigint
table the partition belongs to
part_key_name
string
the partition name
part_key_val
string
the partition value
Timeline
tbl_timestamp
is used to get a self-increasing timestamp for each table
unique key: part_key_name, part_id
name
type
comment
tbl_id
bigint
table identifier
ts
string
timestamp for instant
instant
unique key: ts, action, tbl_id
name
type
comment
instant_id
bigint
auto_increment
tbl_id
bigint
table identifier
ts
string
instant timestamp
action
tinyint
instant action, commit, deltacommit and etc.
state
tinyint
instant state, requested, inflight and etc.
duration
int
for heartbeat
start_ts
timestamp
for heartbeat
instant_meta
unique key: ts, action, state, tbl_id
name
type
comment
commit_id
bigint
auto_increment
tbl_id
bigint
table identifier
ts
string
instant timestamp
action
tinyint
instant action
state
tinyint
instant state
data
binary
instant meta data
Snapshot
tbl_version
unique key: ts, action, state, tbl_id
name
type
comment
tbl_id
bigint
table identifier
version
bigint
auto_increment
instant_id
bigint
instant identifier
snapshot_status
tinyint
whether the version of snapshot has been synchronized
create_time
timestamp
instant completion time
update_time
timestamp
snapshot synchronization completion time
files
unique key: name, part_id
name
type
comment
id
bigint
auto_increment
tbl_id
bigint
table identifier
part_id
bigint
partition identifier
version
bigint
a completed commit identifier
name
string
file name
size
bigint
file size
is_deleted
boolean
whether the file has been deleted
create_time
timestamp
file created time
update_time
timestamp
file updated time
Service Integration
Insert into a MOR table, 'test_db.test_table' with schema
{ "type": "records", "name": "schema", "fields": [ { "name": "record_key" # record key field "type": "string" }, { "name": "ts" # precombine field "type": "bigint" }, { "name": "date" # first level partition field "type": "string" }, { "name": "hour" # second level partition field "type": "string" } ] }
Basic Process
1. create a db and table
Recods in the storage are:
db
db_id | desc | location_uri | name | owner_name | owner_type |
0 | "example" | test_db | tester | user |
tb
tbl_id | db_id | name | create_time | owner | location |
0 | 0 | test_table | 1970-01-01 00:00:00' | tester |
schema
col_id | tbl_id | version | name | type | is_partition_col | comment |
0 | 0 | 0 | record_key | string | 0 | record key field |
1 | 0 | 0 | ts | bigint | 0 | precombine field |
2 | 0 | 0 | date | string | 1 | first level partition field |
3 | 0 | 0 | hour | string | 1 | second level partition field |
2. insert into table and create a delta commit
The changes in the timeline when insert into a hudi table are divided into three times:
create a new instant which is requested.
change the requested instant to inflight and record the work profile.
change the inflight instant to completed and record the commit meta data.
after the commit is completed, files changes (like file size, a new file) recorded in metadata are stored too. This part is described in the `Snapshot Synchronization` part.
Recods in storage are:
instant
instant_id | tbl_id | ts | action | state | duration(s) | start_ts |
0 | 0 | 20220101000000 | deltacommit |
completed | 120 | 20220101000000 |
Step1 inserts a new instant record, step2 updates the instant state from requested to inflight, and step3 is to completed.
Heartbeat: When insert, write client has to send the heartbeat to the server, corresponding to the changes of `duration` and `start_ts`. Each heartbeat updates the two values.
instant_meta
commit_id | tbl_id | ts | action | state | data |
0 | 0 | 20220101000000 | deltacommit | requested | (requested meta data) |
1 | 0 | 20220101000000 | deltacommit | inflight | (work profil) |
2 | 0 | 20220101000000 | deltacommit | completed | (commit meta data) |
tbl_version
tbl_id | version | instant_id | snapshot_status | create_time | update_time |
0 | 1 | 0 | 0 | '2022-01-01 00:01:00' | '2022-01-01 00:01:00' |
Column `snapshot_status` changes to 1 when sync snapshot successfully.
3. Read the table with a specified partition
A hudi table reading process consists of three main parts, excluding the actual file reading:
partition pruning: decide that the partitions needed to be read. The following section describes the implementation.
build file system view: get file status from metastore instead of file listing with distributed file system, and server only return the latest snapshot without multiple version.
build timeline
Asnyc Snapshot Synchronization
Synapshot synchronization is to store file related info in metastore, to speed up the file system view building, instead of using file listing which may cost more.
When an instant is completed, commit metadata contains all files / partitions updated or created in this commit. Server processes commit metadata:
for new partition, add a new record to `partition` table
for existing partition, update the record in the table
for new file, add a new record to `files` table
for existing file, update its size in the table
for deleted file in the clean metadata, delete the record logically by setting status to `is_deleted`
Finally, mark the completion of this commit synchronization by changing status in the table `tbl_version`.
partition
part_id | tbl_id | part_name | create_time | update_time | is_deleted |
0 | 0 | date=20220101/hour=00 | '2022-01-01 00:01:00' | '2022-01-01 00:01:00' | 0 |
partition_key_val
part_id | tbl_id | part_key_name | part_key_val |
0 | 0 | date | 20220101 |
0 | 0 | hour | 00 |
files
id | tbl_id | part_id | version | name | size | is_deleted | create_time | update_time |
0 | 0 | 0 | 1 | file1.log |
| 0 | '2022-01-01 00:00:00' | '2022-01-01 00:01:00' |
1 | 0 | 0 | 1 | file2.log | 1024 | 0 | '2022-01-01 00:01:00' | '2022-01-01 00:01:00' |
Partition pruning
Read a hudi table by sql like
select count(*) from test_db.test_table where date='20220101' and hour = '00' and ts = '0'
and we can get the filter `where date='20220101' and hour = '00' and ts = '0'`.
After the partition service receives the `filter sql`
1. build an expression tree, analyze the sql and get the partition column related conditions
where date='20220101' and hour = '00'
2.generate a new sql filter
where part_key_name='date' and part_key_val='20220101' where part_key_name='hour' and part_key_val='00'
3.query the partition related table with the new sql filter to get the eligible partitions
Extension
1. Hive Metastore Adapter
The adapter in hudi metatore will implement some commonly used API in hive metastore server to do the compatibility so that there are no changes for other engines to read the hudi table.
2.Lock Provider & Cocurrency Management
[TBD]
3. Schema Evalution
[TBD]
4. Cache
Cache the frequently accessed data like table, timeline related information. For larger metadata, cache them with LRU strategy.
The main points are:
what to cahe
how to cache
when to update cache data to keep consistency with the data in the storage.
[TBD]
5. Authorization Management
[TBD]
Implementation
Alpha Version
use RDBMS as metastore storage
support all actions in hoodie
extension part is not in
Interface Definition
[TBD]
client / server interface
server / storage interface
3 Comments
Vinoth Chandar
XiaoyuGeng Can you please create the RFC via the new process here as a GH PR?
https://hudi.apache.org/contribute/rfc-process
XiaoyuGeng
I will move it to github when I finish the rfc. Is it ok?
Pratyaksh Sharma
XiaoyuGeng Are you still working on this?