Proposers
Approvers
- Vinoth Chandar : [APPROVED/REQUESTED_INFO/REJECTED]
- Balaji Varadarajan : [APPROVED/REQUESTED_INFO/REJECTED]
- Nishith Agarwal : APPROVED (only file listing phase)
Status
Current state: ACCEPTED
Discussion thread: here
JIRA: here
Released: 0.7
Abstract
In the current implementation, HUDI Writer Client (in the write path) and HUDI queries (through Inputformat in the read path) have to perform a “list files” operation on the file system to get the current view of the file system. In HDFS, listing all the files in the dataset is a NameNode intensive operation for large data sets. For example, one of our HUDI dataset has about thousands of date partitions with each partition having thousands of data files.
With this effort, we want to:
- Eliminate the requirement of “list files” operation
- This will be done by proactively maintaining metadata about the list of files
- Reading the file list from a single file should be faster than large number of NameNode operations
- Create Column Indexes for better query planning and faster lookups by Readers
- For a column in the dataset, min/max range per Parquet file can be maintained.
- Just by reading this index file, the query planning system should be able to get the view of potential Parquet files for a range query.
- Reading Column information from an index file should be faster than reading the individual Parquet Footers.
This should provide the following benefits:
- Reducing the number of file listing operations improves NameNode scalability and reduces NameNode burden.
- Query Planner is optimized as the planning is done by reading 1 metadata file and is mostly bounded regardless of the size of the dataset
- Can allow for performing partition path agnostic queries in a performant way
In this document we provide two approaches on how to implement this functionality. We intend to finalize to a single approach after discussions and some workflow testing on sample dataset.
Implementation
Basics
RFC-15 has been implemented by using an internal HUDI MOR Table to store the required metadata for the dataset. This table will be internal to a dataset and will not be exposed directly to the user to write / modify.
The Metadata Table has the following features:
- Will be located in the .hoodie/metadata directory
- Is a MOR Table with HFile base / log file formats
- HFile format has been chosen as it allows point lookups of specific records based on record key
- Defines its own custom payload
HoodieMetadataPayload
- Does not use date based partitioning scheme
- Each partition of this table is reserved for various types of metadata information. E.g.
- "files" partition saves fil-listing information
- "record_index" saves a record-key-level index
- etc.
- Follows all the conventions of a HUDI Table (has a schema, can be queried using Hive/Spark, can be cleaned, compacted)
In the reset of this section, “dataset” refers to the Hoodie dataset which holds the data and “Metadata Table” refers to this internal table holding the file listing metadata.
Schema
Being an actual Hoodie Table, the Metadata Table needs to have a schema. The schema provides a way to save the type of the record (enum) and a filename to size mapping.
Hoodie only allows a single Schema for the entire dataset. But we want to save different types of information in the various partitions of the Metadata Table. Hence, the below schema is being used which contains an integer "type" representing the type of the record and an appropriate record which contains the information. The record key will also be chosen based on the type of information being stored.
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieMetadataRecord",
"doc": "A record saved within the Metadata Table",
"fields": [
{ "name": "key", "type": "string" },
{ "name": "type", doc": "Type of the metadata record", "type": "int"},
{ "name": "filesystemMetadata", "doc": "Contains information about partitions and files within the dataset", "type": ["null", {
"type": "map",
"values": {
"type": "record",
"name": "HoodieMetadataFileInfo",
"fields": [
{ "name": "size", "type": "long", "doc": "Size of the file"},
{ "name": "isDeleted", "type": "boolean", "doc": "True if this file has been deleted" }
]
}
}]
}
]
}
For file listing information, the Schema has a record field named filesystemMetadata
which allows saving the list of files
- To save the list of all partitions, we save a record whose
key = "_all_partitions_
", type="1", and whosefilesystemMetadata
contains the list of partition names - To save the list of files in a partition, we save a record whose
key = <name-of-partition>, type="2",
and whosefilesystemMetadata
contains the list of file names present in the partition with their sizes.
Since files can be added as well as deleted, we use the isDeleted
field to distinguish that. During a CLEAN, when files are deleted, we save a record of type=2 with isDeleted=True
for each deleted file. During compaction, all the records with the same key are merged and the information is normalized (HoodieMetadataPayload.combineAndGetInsertValue
) by removing the deleted files. Hence, eventually the record sizes are only proportional to the number of files in the partition. This also proves that the size of the metadata table will eventually be proportional to the number of files and partitions and not to the number of operations on the dataset.
Example of how the records are added:
- Insert into partition 2020/01/01 and 1 parquet file created
- Insert a new record {"key": "2020/01/01", "type": 2, "filenameToSizeMap": {"file10.parquet": 12345, isDeleted: false} }
- Insert a new partition record {"key": "_all_partitions_", "type": 1, "filenameToSizeMap": "2020/01/01": 0, isDeleted: false} }
- Insert into partition 2020/01/02 and 1 parquet file created
- Insert a new record {"key": "2020/01/02", "type": 2, "filenameToSizeMap": {"file20.parquet": 65432, isDeleted: false} }
- Insert a new partition record {"key": "_all_partitions_", "type": 1, "filenameToSizeMap": "2020/01/02": 0, isDeleted: false} }
- Upsert into the two partitions and some records updated. For COW table this will create a new version of parquet file, for MOR table this will write to a LOG file.
- Insert a new record {"key": "2020/01/01", "type": 2, "filenameToSizeMap": {"file11.parquet": 12345, "file11.log": 5678, isDeleted: false} }
- Insert a new record {"key": "2020/01/02", "type": 2, "filenameToSizeMap": {"file21.parquet": 12345, "file21.log": 5678, isDeleted: false} }
- Insert a new partition record {"key": "_all_partitions_", "type": 1, "filenameToSizeMap": {"2020/01/01": 0, "2020/01/02", isDeleted: false}}
When the Metadata Table will be Compacted, the various records with the same keys will be compacted to single records. With a CLEAN older version will be removed.
Metadata Table partitioning scheme
Each type of information will be stored in its own partition. The partition names will be hardcoded and chosen in advance. Within each partition, the number of base-files will be chosen based on the amount of information that needs to be committed.
For saving file-listing information, we have chosen the partition name "files". Even for the largest of datasets (1000s of partitions and millions of files), we expect the size of the encoded/compressed metadata information to be in 100s of MBs and not in GBs. Hence, we assume that the entire file listing metadata information can fit within a single <1GB base file. Having a single base file is also more performant. The base file always needs to be read while querying the Metadata Table so the lesser the number of base files, the faster the reads (HFile
format has some fixed overhead when opening a file).
The Metadata Table has the following hard coded partitions:
files
: The partition which saves file listing information in a single base filecolumn_xxxxx_partition
: (Future Extensibility) For each column being indexed, a specific partition will be created. Keeping the column index information in a separate partition allows these benefits:
Having well known partitions also simplifies the indexing and tagging of Metadata Table updates and makes the operations faster. No external index or Bloom Index is required. Further benefits:
- Individual partition can be onboarded and replaced without affecting other types of information
- Features can be onboarded in different versions (e.g. file-listing enhancements in 0.7 and column-indexes in 0.8)
- Specific features can be dropped within affecting file listing performance or large IO to delete records (delete the partition)
- Can be re-indexed to deal with bugs / updates without affecting file listing performance (replace partition data)
First time initialization
Use of metadata for existing and new Hoodie datasets can be enabled using the HoodieWriteConfig
supplied to the HoodieWriteClient
.
HoodieWriterConfig.newBuilder()
// Provide a Metadata Config
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).validate(true).build())
...
For an existing dataset, first time initialization will use recursive file listing within all the partitions to initialize the Metadata Table. For a new dataset, the Metadata Table is initialized but is empty.
When metadata verification (validate)
is enabled:
- File listings are still used to get list of partition and files within the partition
- Metadata Table is updated as well as queried for the partitions and files within the partition
- Result of #1 and #2 are compared for accuracy
- Results from file listing are returned (data from metadata table is ONLY used for verification purpose)
This can be used during the testing period to ensure there are no bugs in the metadata implementation in production.
Metadata updates and sync
All changes on the dataset (insert / upsert / compaction / etc.) cause changes to the data/log files stored. Hence, for each such change, the metadata stored in the Metadata Table needs to be updated. Changes need to be kept in sync for correct metadata to be always reported. This is the classic case of “transactions on multiple tables” or multi-table-commit.
A change on the dataset is marked using a Hoodie Instant which has three stages - Requested, Inflight and Completed. A change is considered completed only when the Hoodie Instant has been transitioned to the Completed stage. The general idea of updating the metadata is to apply the changes to the Metadata Table right before the transition of Hoodie Instant to the Completed stage (error handling is covered in a later section). All information is derived directly from the HoodieXXXMetadata (HoodieCommitMetadata, HoodieRestoreMetadata, etc)
which is generated as part of the change itself and saved within the Instant files. Hence, no more file listing is required to update the Metadata Table.
The following table details how the various operations are handled.
Operation on dataset | Source of information (Instant) | Operation on Metadata Table |
Insert | HoodieCommitMetadata | DeltaCommit |
Upsert | HoodieCommitMetadata | DeltaCommit |
Compact | HoodieCommitMetadata | DeltaCommit |
Clean | HoodieCleanMetadata | DeltaCommit |
Restore | HoodieRestoreMetadata | DeltaCommit |
Metadata Table is kept in sync with the dataset by reading the completed instants, processing the changes (files added, files deleted), converting this information into HoodieMetadataPayload records and finally commits this information into the Metadata Table. The instantTime on the metadata table is chosen to be the same as that on the dataset. This helps in debugging to reason about difference between data timeline and metadata timeline.
Operation on dataset | Instant on dataset Timeline | Corresponding Operation on Metadata Table | Instant on Metadata Table Timeline |
Insert | 20201122.commit | Deltacommit | 20201122.deltacommit |
Upsert | 20201123.commit | Deltacommit | 20201123.deltacommit |
Compact | 20201124.commit | Deltacommit | 20201124.deltacommit |
Clean | 20201125.clean | Deltacommit | 20201125.deltacommit |
Restore | 20201126.restore | Deltacommit | 20201126.deltacommit |
Metadata Table may have additional instants which do not occur on the dataset table. This is because the compaction and clean schedules of Metadata Table are independent of those of the dataset.
Design V1 (pre 0.10.0):
Asynchronous updates to Metadata table: Until 0.10.0, we had asynchronous way of applying updates to metadata table. When the Metadata Table is opened for reads, there could be more completed instants in data table which was not yet synced to metadata table. We need to ensure that it is in sync with the dataset. This is done as follows:
- List all the completed instants on the dataset
- Do all instant times have corresponding deltacommit instant on Metadata Timeline?
- Yes: Then the Metadata Table is in sync
- No: Metadata Table is out of sync
Metadata Table can be synced by reading the Instants from the dataset Timeline as using their metadata to apply the updates on the Metadata Table.
Handling Async Operations
HUDI allows some asynchronous operations (e.g. Compaction and Clean) to be scheduled in parallel with other operations like Commit. The Metadata Table cannot be synced with the results of the asynchronous operations before they complete. Furthermore, to ensure consistency, we cannot sync the completed instants which occur after the inflight asynchronous operations.
To deal with this issue, when asynchronous operations are detected on the dataset:
- Metadata Table will not be synced any further
- All the un-synched but completed instants are read and merged in memory
- The in-memory information combined with the already committed information provides the correct and latest view of the file-listing information.
The in-memory merging is performed until the asynchronous operations are not completed. This ensures that metadata table is always consistent and readers can get the latest view.
Metadata Table Modes
The metadata table can be opened in two modes:
- Read-only mode: In this mode the table is queried but no updates are applied to it. The "latest" synced instant on the table is used to read metadata information which is accurate at some point of time in the past.
- Used for query side optimizations (e.g. hudi-hadoop-mr RecordReaders)
- Read-write mode: In this mode, the HoodieWriteClient writes updates the Metadata and keeps it in sync with the dataset.
- To prevent write conflicts, only a single writer may open the Metadata Table in read-write mode. Since HUDI is single writer, this means that Metadata Table should only be opened in read-write mode through HoodieWriteClient.
Metadata Table Reads
The metadata MOR table is made up of two types of file - the base file (HFile format) and the log files containing the delta updates.
Suppose we want to read the list of partition from the Metadata Table. This involves the following steps:
- Open the Metadata Table and find the latest synced instant
- Find the latest File Slice in the "
files" partition.
- Create a HoodieMergedLogRecordScanner over the log files of the latest slice. This reads all updates and merges them in memory.
- Determine the key to be read - in this case we need the list of partitions which is saved with the key
__all_partitions_.
- Read the key from the base file (if present). This is a point lookup in the base HFile and returns a HoodieRecordPayload if the key was found.
- Read the key from the HoodieMergedLogRecordScanner. This is a in-memory lookup in the merged records and returns a HoodieRecordPayload if the key was found.
- Merge the two payloads found in #6 and #7. Either or both of them may be null.
Performance: Lookups from base HFile are very fast. Delta updates are only merged once and are kept in memory. Periodic compaction will reduce the number of log file blocks to be read, merged and kept in memory.
Design V2 (0.10.0 onwards):
We made foundational fix here, where in all commits are first applied to metadata table followed by data table. This makes the design a lot simpler and easy to manage in the long run as it avoids lot of corner cases. When the Metadata Table is opened for reads, there could be committed instants in metadata table which was never committed to data table. So, when instantiating the reader, we read the data table timeline and filter only for those instants which are completed.
In this design, all writers and table services will apply the changes to metadata table and will commit to data table. So, there is no catch up required. But with synchronous updates, we need to solve how concurrent writers from data table could apply changes to metadata without any issues.
Single writer: With a single writer use-case, with all inline services, there won't be any concurrent operations which tries to update metadata simultaneously.
Strictly multi-writer: with multi-writer scenarios, users would have configured lock providers and so any write to metadata table is guarded by the data table lock. We ensure only one write will get applied to metadata at any point in time.
Single writer with async table services: This is an interesting one. Because, as of today users don't have to configure a lock provider for this and everything works smoothly just with data table(if not for metadata table). But with metadata table, we ought to request users to configure lock services, bcoz, a regular write from data table and an async table service in data table could concurrently update metadata table. If not for locking, there could be data loss. Think about a scenario where writer1 tries to apply updates to metadata table and triggers a compaction after finishing the delta commit. Concurrently another writer also tries to apply updates to metadata table. So, there is a chance that the 2nd writer could apply delta commit to old file slice which compaction plan could have missed it. So, cater to this, we are requesting users to configure lock providers for data table if they wish to enable metadata table.
Metadata Table Modes
The metadata table can be opened in two modes:
- Read-only mode: In this mode the table is queried but no updates are applied to it. As mentioned earlier, any additional commits in metadata table which was not yet complete in data table will be ignored while reading.
- Used for query side optimizations (e.g. hudi-hadoop-mr RecordReaders)
- Read-write mode: In this mode, the HoodieWriteClient writes updates the Metadata and keeps it in sync with the dataset.
- To prevent write conflicts, only a single writer may open the Metadata Table in read-write mode.
Metadata Table Reads
The metadata MOR table is made up of two types of file - the base file (HFile format) and the log files containing the delta updates.
Suppose we want to read the list of partition from the Metadata Table. This involves the following steps:
- Open the Metadata Table and find the latest File Slice in the "
files" partition.
- Create a HoodieMergedLogRecordScanner over the log files of the latest slice by filtering for valid (completed) commits from data timeline. This reads all updates and merges them in memory.
- Determine the key to be read - in this case we need the list of partitions which is saved with the key
__all_partitions_.
- Read the key from the base file (if present). This is a point lookup in the base HFile and returns a HoodieRecordPayload if the key was found.
- Read the key from the HoodieMergedLogRecordScanner. This is a in-memory lookup in the merged records and returns a HoodieRecordPayload if the key was found.
- Merge the two payloads found in #6 and #7. Either or both of them may be null.
Performance: Lookups from base HFile are very fast. Delta updates are only merged once and are kept in memory. Periodic compaction will reduce the number of log file blocks to be read, merged and kept in memory.
Query Engine Integration
Spark
How File listing works for Hudi Spark DataSource and Spark SQL queries ?
To execute queries Spark first needs to get list of files, which are then converted to partitions/splits and passed on to the executors for tasks to perform their operations on those files. Here is how the file listing is obtained for different query types:
- Copy on Write DataSource queries: Hudi relies on Spark’s parquet datasource internally to query the dataset since copy-on-write tables can be directly queried like parquet tables. The parquet datasource will internally perform file listing through Spark’s InMemoryFileIndex. However, to filter out only the latest commit files Hudi passes a path filter called HoodieROTablePathFilter which helps InMemoryFileIndex filter out files from the latest commit only. Spark internally invokes the listFiles method of the super class, which return files per partition directory in the PartitionDirectory format expected by Spark. The listing is parallelized across the cluster using spark context, and also maintains FileStatusCache to cache file listing across queries. The File Index also performs partition pruning which makes sure to prune out unnecessary files during query planning if partition filters are passed in the query, which helps in reducing I/O and hence query execution time. Although, for Hudi this partition pruning does not really work because Spark is not able to understand Hudi partitions and derive partition schema to be able to do pruning.
- Merge on Read DataSource queries: Hudi has an in-built Spark Relation called MergeOnReadSnapshotRelation for querying Merge on Read tables for this case, because these tables cannot be queried directly through Spark’s parquet datasource (needs merge of log files and parquet files). However, this has been integrated to directly create and invoke InMemoryFileIndex for listing files so we are able to make use of its benefits like parallelized listing and caching out of the box.
- Copy on Write Spark SQL queries: In this case the Hudi dataset is registered as a Hive table and can be queried using SQL via Spark. There are two scenarios here:
- spark.sql.hive.convertMetastoreParquet = true (default): In this scenario, Spark SQL queries go through Spark’s parquet datasource implementation because Spark identifies it as a parquet table based on its registered parquet serde in the Hive table created in the metastore, and uses it to query. Again, customers need to pass the path filter i.e. HoodieROTablePathFilter for Spark to filter out the latest commit files when performing the listing via InMemoryFileIndex.
- spark.sql.hive.convertMetastoreParquet = false: In this scenario, Spark SQL queries go through Hive InputFormat implementation for Hudi COW tables i.e. HoodieParquetInputFormat. This is directly able to use Hudi’s filesystem view to get latest commit files without needing path filter. But Hive is not able to parallelize the listing across cluster nodes, and at most can do multi-threaded listing at the Hive driver and is slower than spark’s file listing.
- Merge on Read Spark SQL queries: To query Merge on Read tables via Spark SQL customers need to use spark.sql.hive.convertMetastoreParquet = false because Merge on Read tables cannot be directly queried using Spark’s parquet datasource (because of merging of log and parquet files needed). These queries go through Hudi’s Hive InputFormat implementation for merge on read tables capable of doing the merge i.e. HoodieRealtimeInputFormat. Hence, the file listing also happens through Hive for this case, instead of through spark.
- Bootstrapped Copy on Write DataSource queries: In case of DataSource based queries, the bootstrapped COW tables are queried using a custom Spark Relation i.e. HoodieBootstrapRelation which performs listing by directly creating Spark’s InMemoryFileIndex.
- Bootstrapped Copy on Write Spark SQL queries: In case of Spark SQL, the bootstrapped COW tables are queried using the HoodieParquetInputFormat implementation via Hive.
- Copy on Write Incremental queries: In this case listing happens by reading the commit metadata files for commits touched by the incremental query, since we need only the files updated in those commits.
- Merge on Read Incremental queries: In this case listing happens by reading the commit metadata files for commits touched by the incremental query, since we need only the files updated in those commits.
Proposal
Proposal For V1 (Hudi 0.7.0)
Modify HoodieROTablePathFilter so that it can use Metadata table to list files under the parent folder of the path passed to the filter, so that it can help determine if the file belongs to latest commit. So while the file listing of the root path still happens internally withing Spark through InMemoryFileIndex, there will be some performance benefit by making the listing that is happening in the PathFilter go through Metadata table (since we have control over the path filter in Hudi code but not over InMemoryFileIndex). This will benefit the following cases:
- Read Optimized queries via Spark DataSource: These queries are directed to Spark's Parquet DataSource which internally does file listing using its InMemoryFileIndex. However, Hudi needs to pass the HoodieROTablePathFilter here while directing the query to DataSource so that InMemoryFileIndex can internally use the Path Filter to filter out the latest commit files. Since we are making the path filter more performant with the metadata these queries will benefit.
- Read Optimized queries Spark SQL: Here is the guide to query via Spark SQL which details that for Spark SQL the queries go through Hive InputFormat implementations that Hudi has. However for read optimized queries it can be made to go through Spark's parquet datasource as it just involves reading parquet files, but letting
spark.sql.hive.convertMetastoreParquet
be true (default). However, user needs to pass HoodieROTablePathFilter so Spark can filter out the latest commit files out of all the files. Since we are making the path filter more performant with the metadata these queries will benefit.
Proposal For V2 (Hudi 0.9.0)
We need a way to hook a custom File Listing implementation for Hudi, so that we can have Spark list the root and partition paths using Metadata Table in the first place. If metadata is not enabled, it should fall back to listing in a distributed way using spark context. The proposal is to implement a custom Spark's FileIndex for Hudi DataSource which provides a good integration point for implementing our own listing solution. At a high level this is how we can use FileIndex APIs for implementing this:
class HoodieFileIndex(spark: SparkSession, fileStatusCache: FileStatusCache)
extends FileIndex {
override def refresh(): Unit = {
// This is called internally by Spark every time a Spark relation is created
// or if Spark's FileStatusCache is expired.
//
// Perform the file listing of the root path here:
// 1. If file listing is already present in FileStatusCache return it
// 2. If metadata is enabled list using metadata listing
// 3. If metadata is not enabled list using spark context in a distributed
// way
// 4. Build the Hudi FileSystemView and filter out latest commit files
// 5. Store file listing in Spark's FileStatusCache
}
override def rootPaths: Seq[Path] = {
// returns the base path of the table or a sub path (if user has
// specified one in the query)
}
override def listFiles(partitionFilters, dataFilters) = {
// 1. Perform partition pruning
// 2. Return latest commit files from the FileSystemView for the filtered
// out partitions.
}
}
Additional details for integrating Spark DataSource queries with FileIndex:
- For several query types like real time queries, bootstrap queries which directly rely on Spark's InMemoryFileIndex (in their Spark Relation implementations) to perform the file listing we can switch to directly using the custom FileIndex implementation.
- For read optimized queries these queries are directed to Spark's Parquet DataSource which internally does file listing using the same InMemoryFileIndex. This we need a way to make Spark's Parquet DataSource be able to use the custom FileIndex implementation. The proposal is to create Parquet DataSource via HadoopFsRelation instead which provides a pass a FileIndex implementation:
HadoopFsRelation(
// custom file index,
// partition schema,
// data schema,
None,
new ParquetFileFormat,
optParams)(sqlContext.sparkSession)
This proposal work's well for Spark DataSource based queries which go via Hudi DataSource implementation and will use the FileIndex. However this is not the case for Spark SQL which has the following caveats:
- Read Optimized: Here is the guide to query via Spark SQL which details that for Spark SQL the queries go through Hive InputFormat implementations that Hudi has. However for read optimized queries it can be made to go through Spark's parquet datasource as it just involves reading parquet files, but letting
spark.sql.hive.convertMetastoreParquet
be true (default). However, user needs to pass HoodieROTablePathFilter so Spark can filter out the latest commit files out of all the files. - Real Time: These queries are executed via HoodieRealtimeInputFormat and users need to set
spark.sql.hive.convertMetastoreParquet
be false as mentioned in the guide. - Bootstrap: The bootstrap queries also go through HoodieParuqetInputFormat and HoodieRealtimeInputFormat for bootstrapped COW and MOR tables.
However, now since we have native Spark Relation implementations for MOR Real Time queries and Bootstrap queries all of Spark SQL queries can ideally be made to go through Hudi DataSource itself instead of relying on InputFormat. This will help improve Spark SQL query performance both through listing which can happen via FileIndex and also query execution as it will be able to use Spark's native readers and built in optimizations which offer vast improvement over Hive readers. To make Spark SQL queries use Hudi DataSource the Hudi tables need to be registered as DataSource Tables in the metastore, so that Spark can internally understand that it needs to use the DataSource for querying and not the Serde/Inputformat registered for the table. This can be done by registering the following table properties in the metastore when doing Hive Sync for Hudi tables:
- spark.sql.sources.provider => hudi (This would make Spark use Hudi DataSource implementation for the query)
- spark.sql.sources.schema.part.0...N => The data schema broken into parts
- spark.sql.sources.schema.numParts => N (Number of parts the schema is broken into)
- spark.sql.sources.schema.numPartCols => Number of partition columns
- spark.sql.sources.schema.partCol.0...N => The partition column field names (0 to N for N partition columns)
With the above, all of Spark SQL queries will use Hudi DataSource and hence end up using the custom FileIndex for doing the listing. Although InputFormat can also do listing with metadata table because of Hive integration, but that listing is not as efficient via SparkSQL because Spark instantiates a new InputFormat instance for each partition and does listing at different executors for each partition. This would end up reading the metadata O(N) times for N partitions and hence would not offer that much performance benefit. However by integrating it with FileIndex, it will perform O(1) reads of the metadata table for N partitions and offer much better performance. Also, even in the fallback case the listing happens using Spark Context and is distributed across the cluster whereas in Hive the listing happens only at the Driver. Thus, in either case whether Metadata enabled or Not this is the best way forward.
To summarize the advantages of this approach:
- One centralized file listing solution for all query types via Spark DataSource or via Spark SQL
- Best efficiency in both cases:
- Metadata enabled => O(1) reads of metadata table for N partitions and avoids O(N) list calls
- Metadata disabled => Spark context to distribute listing across the cluster
- It will vastly improve Spark SQL query performance as they also be able to use Hudi DataSource for queries and make use of Spark's native implementations and optimizations which are far more performant than Hive readers.
- Custom FileIndex be used to support partition pruning for Spark DataSource queries which currently does not work.
- File Index can later be extended for other features like pruning based on column range index (column statistics), time travel queries etc.
Hive
How File listing works for Hudi Hive queries ?
Proposal
Presto
How File listing works for Hudi Presto queries ?
Here is how Hudi queries work for PrestoDb:
- Read optimized queries: These queries only require reading of parquet files and hence are read using Presto's Hive connector with its native parquet readers to be able to use all the optimizations. The split generation and file listing are also handled using Presto's native logic. For Presto to be able to filter out the latest commit files after listing, it uses HoodieROTablePathFilter here. This is similar to Spark's file listing in case of read optimized queries where it needs the path filter.
- Real time queries: These queries require reading of parquet and log files to provide the complete snapshot. It is supported using Hudi's InputFormat for real time tables i.e. HoodieRealtimeInputFormat. This allows Presto to re-use the InputFormat implementation without having to implement native read support, although it is not as efficient as Presto's native parquet readers have several optimizations. The split generation and file listing also happens via the InputFormat by calling the getSplits on InputFormat here.
Proposal
Proposal for Read optimized queries
Approach 1: Support via HoodieParquetInputFormat
We switch to using Hudi's InputFormat split generation logic for read optimized tables i.e. HoodieParquetInputFormat, instead of using Presto's native split generation logic (current approach). This would similar to how real time queries work for presto via HoodieRealtimeInputFormat where it uses the split generation and inherently its file listing logic. Thus, if HoodieParquetInputFormat is integrated to be able to list using metadata tables, it would work via Presto as well and we won't have to implement anything specific for Presto.
Advantages:
- Able to re-use the InputFormat integration with metadata listing without implementing anything native for Presto.
- Decouples Hudi code from Presto code based, as we don't have to write more Hudi specific integration code within Presto Hive connector (although its not that big an advantage since Presto community has already accepted Hudi as a direct dependency and there is already Hudi specific code in Presto to use Hudi's PathFilter and InputFormat for splits generation and record reading in case of bootstrap and real time queries).
Disadvantages:
- Using InputFormat's split generation via Presto has the overheads of serialization/de-serialization from InputFormat splits to Presto's native splits and back.
- Presto's native file listing is more optimized than when InputFormat is used via Presto. This is because of the way Presto invokes InputFormat, where it calls getSplits API individually on each partition in separate threads. It also creates separate instances of InputFormat in each thread. This would result in metadata table being read N times for N partitions because of separate InputFormat instances, when ideally it could be read just once to get list for all partitions.
Approach 2: Implement a DirectoryLister for Hudi within Presto
Presto has a DirectoryLister interface, and has an implementation HadoopDirectoryLister which it uses to list files via hadoop FileSystem APIs. This DirectoryLister is invoked for each partition to do the listing. We can implement a custom DirectoryLister for Hudi, where we can implement our own custom logic that does the following:
- If metadata listing is enabled, get the file listing for the partition using metadata table.
- If metadata is not enabled, get the file listing using FileSystem API.
- Filter out the latest commit files using Hudi's FileSystemView APIs.
The DirectoryLister is called to list for every partition, but there is just once instance of the DirectoryLister. This allows us to control that the metadata table reader is opened just once, when the DirectoryLister instance is created. Individual calls for partitions use the same reader to do seeks on the HFile to get list for those partitions. This reduces it to O(1) reads for N partitions and uses the metadata efficiently.
Advantages:
- The metadata table will be used efficiently and read just once. It reduces O(N) LIST calls to O(1) GET calls for N partitions.
- It even further optimizes the filtering of latest commit files by getting rid of the PathFilter which is applied to each and every file and has some cost because of the internal listing and initializations it does. Instead, we directly use Hudi's FileSystemView to get the latest commit files in just one call.
Disadvantages:
- Additional Hudi specific code in Presto. But this seems okay because Hudi is already tightly coupled with Presto. Presto already has adopted Hudi as a direct dependency.
Overall, I am more in favor of Approach 2 because we are able to maintain Presto's native optimizations and also read metadata just once for N partitions. This would offer the best performance enhancements.
Proposal for Real time queries
Support via HoodieRealtimeInputFormat
We continue with existing approach for real time query support of using HoodieRealtimeInputFormat for split generation which internally does the file listing. Thus, HoodieRealtimeInputFormat's integration to work with metadata tables would work out of the box for Presto without having to implement anything native to Presto.
We cannot use the Presto's native listing or DirectoryLister here because Presto depends upon HoodieRealtimeInputFormat to generate HoodieRealTimeFileSplit's which need the additional information about log files (apart from the base parquet file). Presto does not have native logic to generate these type of file splits and we re-use HoodieRealtimeInputFormat's split generation logic to do it for us, so we do not have to write native Presto code for generating and handling real time file splits (specific for Hudi).
Advantage:
- Able to re-use the InputFormat integration with metadata listing without implementing anything native for Presto.
- Decouples Hudi code from Presto code based, as we don't have to write more Hudi specific integration code within Presto Hive connector (although its not that big an advantage since Presto community has already accepted Hudi as a direct dependency and there is already Hudi specific code in Presto to use Hudi's PathFilter and InputFormat for splits generation and record reading in case of bootstrap and real time queries).
- Builds on the existing solution we have for real time query support.
Disadvantage:
- Metadata table will not be used in the most efficient way. As mentioned earlier getSplits is called for each partition by Presto, and every time on separate InputFormat instance. This would cause metadata table reader to be opened N times for N partitions with 1 seek for that partition. However, we will be looking to see if some kind of caching is possible to implement with InputFormat to help avoid this.
In Future we probably want to implement native real time splits and reading support within Presto for Hudi. If we move towards that, then the same DirectoryLister can be used to get file listing. However, native readers implementation for real time queries is out of scope for this feature.
Error Handling for Design V1 (pre 0.10.0):
Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly. Two types of errors need to be handled:
- Metadata Table updated (deltacommit completed) but dataset instant not completed
- Example: Crash after Metadata Table update but before dataset Instant could be transitioned from inflight to completed
- Metadata Table update fails
- Example: Crash during Metadata Table update
- Example: Crash during Metadata instant transition from inflight to completed
- Async updates to the dataset (async clean, async compaction)
- Multi-writer scenario causing async updates to the dataset (FUTURE)
Handling error case #1:
- Determine the latest instant on the dataset
- Read and merge the Metadata Table logs ONLY before the latest instant
- This in turn neglects the update performed on the metadata table but not yet visible on the dataset
- Next update on the dataset will rollback the failed change (orphan inflight instant)
- The rollback will trigger a rollback on the metadata table bringing it into sync
Handling error case #2:
- Since the Metadata Table update failed, the dataset itself will not have transitioned to complete on the action. Hence, the action on the dataset also failed.
- Next action on the dataset should rollback the failed change (orphan inflight instant)
- The rollback will trigger a rollback on the metadata table bringing it into sync
Handling error case #3 - Async Clean
- When an async clean is scheduled on the dataset, it generates a CleanPlan which is written to the clean.requested instant
- Metadata Table is updated before the creation of clean.requested instant
- Async clean will delete the files in the background and complete the clean operation
- Metadata Table does not need to be updated after/during the async clean operation is in progress as the changes have been pre-applied.
Handling error case #3 - Async Compaction
- When an async compaction is scheduled on the dataset, it generates a CompactionPlan which is written to the compaction.requested instant. New base files are created at the end of the compaction process but they are not visible on the dataset view until the compaction process completes.
- Metadata Table is updated before the compaction process completes. The log writer already supports multi-writer so we do not have a problem with async / parallel updates to the log.
- Any active Metadata reader may not see the background compaction complete operation until next refresh. This should not have any data consistency issues as the whole File Slice design ensures that data is still visible through the older versions of the base / log files.
Error Handling for Design V2 (0.10.0 onwards):
Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly. Two types of errors need to be handled:
- Metadata Table updated (deltacommit completed) but dataset instant not completed
- Example: Crash after Metadata Table update but before dataset Instant could be transitioned from inflight to completed
- Metadata Table update fails
- Example: Crash during Metadata Table update
- Example: Crash during Metadata instant transition from inflight to completed
- Metadata table compaction/cleaning fails mid-way.
- Async updates to the dataset (async clean, async compaction)
Handling error case #1:
- Readers should handle this via log record reader where in we filter only for completed commits from data table while reading metadata table.
- For next write in data table, partial commit will be rolledback. This will result in a new delta commit in metadata table which will take care of ignoring files already synced.
Handling error case #2:
- Since the Metadata Table update failed, the dataset itself will not have transitioned to complete on the action. Hence, the action on the dataset also failed.
- Next action on the dataset should rollback the failed change (orphan inflight instant)
- The rollback will trigger a delta commit on the metadata table which will apply the list of files to be deleted as part of rollback.
Handling compaction/cleaning failure in metadata table #3:
- If a compaction in metadata table fails, we need to ensure readers are still able to read the data from metadata table w/o any issues and it is re-attempted as well. So, we expose runAnyPendingCompaction() api in writeClient and before attempting to trigger a compaction, we will first try to reattempt any pending compactions from last round. So, until the time compaction complete, we merge the latest file slice with previous file slice and serve the readers. Writers to metadata can just polll the latest file slice since all it needs to do it append delta logs. Let's illustrate the scenario.
- There are 5 delta commits in data table. 1 more delta commit will trigger a compaction in metadata table. Current fileSlice is FS2 for eg.
- writer tries to apply C6 and adds deltacommit6 to metadata table. this triggers a compaction in metadata table which will have 6 delta logs in the compaction plan. But the compaction fails mid way. Lets say file slice is FS3 for compaction.
- writer is restarted, which will try to rollback C6 since it failed. This is yet another delta commit in metadata table. But since compaction has been scheduled, this will go into FS3. after finishing the detla commit, we will try to complete any pending compactions. So, this will re-attempt the failed compaction from last round.
- Once it succeeds, we will proceed on as usual.
Handling error case #4 - Async Clean/Compaction in data table
- When an async clean is scheduled on the dataset, it generates a CleanPlan which is written to the clean.requested instant
- Async clean will delete the files in the background and complete the clean operation
- Just before committing to data table, the commit will get applied to metadata table and data should be intact. Readers will ensure to skip this if it failed before getting applied to data table.
Query handling (read path)
From a reader/query engines perspective, Inputformat’s “listStatus” requests will be served with the current view of the file system without the expensive distributed file system queries.
Maintaining additional metadata in the manifest file would help Hudi support faster queries and optimize the dataset to handle complex queries. During the planning phase of the query from hive/presto, through supported Hudi input formats complex queries can be optimized with the use of metadata available in master manifest and partition manifest files.
For example, with “SELECT * FROM table WHERE x1 < C1 < y1 AND x2 < C2 < y2 …. ;”, when handling complex queries,
- Hudi will inspect the master manifest file to prune all partitions that don’t match the query range (for example, prune all partitions that don’t satisfy (x1 < C1 < y1)). Apply the second and subsequent parts of the query to prune the partitions list further.
- Once list of “candidate partitions” have been identified, Hudi will inspect the corresponding partition manifest files, to prune the data files that don’t match the query range to generate “candidate data files” where the data is most likely to reside.
- For most frequently used queries (learned over time for each dataset), Hudi could optimize further my maintaining a list of “hot columns” and maintaining column specific metadata (like range/ index files).
Rollout/Adoption Plan
- What impact (if any) will there be on existing users?
- None. This functionality will be add-on to existing datasets. If the metadata or indexes are not available, the functionality will not be enabled.
If we are changing behavior how will we phase out the older behavior?
- This new feature is backwards-compatible. If the metadata or indexes are not available, the existing way of listing files through the NameNode will be used.
- This new feature is backwards-compatible. If the metadata or indexes are not available, the existing way of listing files through the NameNode will be used.
- If we need special migration tools, describe them here.
- NA
- NA
- When will we remove the existing behavior?
- Is not required.
Test Plan
Extensive unit tests will test every aspect of the metadata implementation.
How to test using HUDI CLI
HUDI-CLI has been updated to included the following metadata related command. These command require the HUD-CLI to be connected to the dataset (connect --path /path/to/hudi-dataset
)
Command | Description | Output |
---|---|---|
metadata set --metadataDir /some/writable/dir/ | Sets the directory to use for writing the metadata table (default .hoodie/metadata). This is useful if you do not want to write or don't have write permission to the dataset directory. | |
metadata create | Creates the metadata table | ..... logs ... Created Metadata Table in /some/writable/dir/ (duration=15.24sec) |
metadata delete | Deletes the metadata table and all its files. | |
metadata stats | Shows stats for the metadata. Relevant only after the table has been created. | Base path: /path/to/hudi-dataset partitionCount: 493 lastCompactionTimestamp: none isInSync: true logFileCount: 1 totalBaseFileSizeInBytes: 0 totalLogFileSizeInBytes: 35404 baseFileCount: 0 |
metadata list-partitions | List all the partition. If the metadata table exists then it is used otherwise file listing is used. | |
metadata list-files --partition 2019/05/15 | List all the files in given partition. If the metadata table exists then it is used otherwise file listing is used. | |
metadata init | Update the metadata table to reflect updates to the dataset since the creation. |
metadata list-partitions
and metadata list-files
can be used for performance measurements. If these commands are issued before metadata table is created, they show the time taken using file listings. When issued after metadata table is created, they show the time for lookups from the metadata table.
Metadata Table Metrics
The following metrics have been added for the metadata
Metric | Description |
---|---|
HoodieMetadata.initialize.duration | Time to initialize metadata table (first time creation) |
HoodieMetadata.initialize.count | Count of initializations |
HoodieMetadata.sync.duration | Time to sync the metadata table |
HoodieMetadata.sync.count | Count of sync |
HoodieMetadata.validate_errors.count | Number of errors while validating the metadata table contents |
HoodieMetadata.validate_partitions.duration | Time to get list of partitions using file listing (for purpose of validation) |
HoodieMetadata.lookup_partitions.count | Number of times partition list was fetched from metadata table |
HoodieMetadata.validate_files.duration | Time to get list of files in partitions using file listing (for purpose of validation) |
HoodieMetadata.lookup_files.count | Number of times list of files was fetched from metadata table |
HoodieMetadata.scan.duration | Time to read and merge all the LOG files for metadata table |
HoodieMetadata.scan.count | Number of times the metadata LOG files were read |
11 Comments
Vinoth Chandar
Prashant Wason please start a mailing list DISCUSS thread when ready
Vinoth Chandar
One high level comment is that : could we try to design this using the same base/log file concepts that we have for data?
indexing is being rethought in a similar fashion.. and I think we can implement a metastore using a similar approach as well..
(may be this is implementation detail, that we can work out down the line)
Prashant Wason
>> could we try to design this using the same base/log file concepts that we have for data?
The idea is very similar in Approach #1 though the file formats chosen for the base/log files will be optimized for key-value lookups rather than columnar lookup.
Base Manifest (called Master Manifest) will have information till the last-creation-time and delta-manifest will have the updates at each Commit time. Periodically we will merge the delta into the Base. This can use the faimilar timestamp-as-fileid approach used to
"tag" the HUDI Actions with a valid time and ID.
>> indexing is being rethought in a similar fashion.
Indexing is a different beast altogether with the following differences from this RFC:
Vinoth Chandar
Prashant Wason I know you folks are blocked on this.. You will have a detailed review by monday..
Vinoth Chandar
Here's my take on the two approaches. Between the two, I do favor the approach 2's way of tracking the file level master manifest, without having two levels of hierarchy implicitly.
However, I do feel there are lots more to address there.
In general, I would love if we designed this in a log structured manner i.e updates to manifest are logged and can be compacted. (This is merely trying to formalize the notion of delta files you have in approach 2).
What if we had the following as three file groups
file_metadata : Base HFile, with key: data_fileID, value : latest file slice for the fileID. Whenever new file slice is added (compaction, merge handle) or new log files are added we log it to a log file, again with inline HFile (as we are exploring in RFC-08).
file_stats_metadata: Base HFile with key : <column_name>+<fileID> , value : min, max, nulls etc... Each time this changes (either via update/insert/delete), we log that and compactor will know to merge these into final stats.
column_stats_metadata : This is all the column indexes that you originally had as separate files , with base HFile key : <column_name> + <range> : (fileids) ... similarly we can log changes to this..
My thinking is we can leverage the existing machinery (I and others can help provide these foundations for you to build on) we have for logging and compaction and design something incremental and scalable from the start.
Vinoth Chandar
More discussions between Nishith Agarwal Prashant Wason Balaji Varadarajan Vinoth Chandar + Balajee Nagasubramaniam
Agree on :
Open items :
Vinoth Chandar
sivabalan narayanan can you share a pointer to your hfile benchmarks here?
Vinoth Chandar
Open items
cc Nishith Agarwal Balaji Varadarajan Balajee Nagasubramaniam Prashant Wason
Vinoth Chandar
For implementing phase 1 (just eliminating listing using partition index)
Nishith Agarwal
Summarizing discussion among folks from Uber, AWS and OSS on how to implement record level & column indexes.
Udit Mehrotra Vinoth Chandar sivabalan narayanan Satish Kotha Prashant Wason
Nishith Agarwal
Vinoth Chandar Prashant Wason Updated the title of the RFC to reflect the actual work and details, PTAL