Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Proposers

Approvers

Status

Current state:

Status
colourGreen
titleACCEPTED

Discussion thread: here

JIRA: here

Released: 0.7

Abstract

In the current implementation, HUDI Writer Client (in the write path) and HUDI queries (through Inputformat in the read path) have to perform a “list files” operation on the file system to get the current view of the file system.  In HDFS, listing all the files in the dataset is a NameNode intensive operation for large data sets. For example, one of our HUDI dataset has about thousands of date partitions with each partition having thousands of data files.

...

In this document we provide two approaches on how to implement this functionality. We intend to finalize to a single approach after discussions and some workflow testing on sample dataset. 

Implementation

Basics

RFC-15 has been implemented by using an internal HUDI MOR Table to store the required metadata for the dataset. This table will be internal to a dataset and will not be exposed directly to the user to write / modify.

...

In the reset of this section, “dataset” refers to the Hoodie dataset which holds the data and “Metadata Table” refers to this internal table holding the file listing metadata.


Schema

Being an actual Hoodie Table, the Metadata Table needs to have a schema. The schema provides a way to save the type of the record (enum) and a filename to size mapping.

...

When the Metadata Table will be Compacted, the various records with the same keys will be compacted to single records. With a CLEAN older version will be removed.

Metadata Table partitioning scheme 

Each type of information will be stored in its own partition. The partition names will be hardcoded and chosen in advance. Within each partition, the number of base-files will be chosen based on the amount of information that needs to be committed. 

...

  1. Individual partition can be onboarded and replaced without affecting other types of information
  2. Features can be onboarded in different versions (e.g. file-listing enhancements in 0.7 and column-indexes in 0.8)
  3. Specific features can be dropped within affecting file listing performance or large IO to delete records (delete the partition)
  4. Can be re-indexed to deal with bugs / updates without affecting file listing performance (replace partition data)

First time initialization

Use of metadata for existing and new Hoodie datasets can be enabled using the HoodieWriteConfig supplied to the HoodieWriteClient.

...

This can be used during the testing period to ensure there are no bugs in the metadata implementation in production. 

Metadata updates and sync

All changes on the dataset (insert / upsert / compaction / etc.) cause changes to the data/log files stored. Hence, for each such change, the metadata stored in the Metadata Table needs to be updated. Changes need to be kept in sync for correct metadata to be always reported. This is the classic case of “transactions on multiple tables” or multi-table-commit.

...

Metadata Table may have additional instants which do not occur on the dataset table. This is because the compaction and clean schedules of Metadata Table are independent of  those of the dataset.

Design V1 (pre 0.10.0):

Asynchronous updates to Metadata table: Until 0.10.0, we had asynchronous way of applying updates to metadata table. When the Metadata Table is opened for reads, there could be more completed instants in data table which was not yet synced to metadata table. We need to ensure that it is in sync with the dataset. This is done as follows:

...

Metadata Table can be synced by reading the Instants from the dataset Timeline as using their metadata to apply the updates on the Metadata Table. 

Handling Async Operations

HUDI allows some asynchronous operations (e.g. Compaction and Clean) to be scheduled in parallel with other operations like Commit. The Metadata Table cannot be synced with the results of the asynchronous operations before they complete. Furthermore, to ensure consistency, we cannot sync the completed instants which occur after the inflight asynchronous operations.

...

The in-memory merging is performed until the asynchronous operations are not completed. This ensures that metadata table is always consistent and readers can get the latest view.

Metadata Table Modes

The metadata table can be opened in two modes:

  1. Read-only mode: In this mode the table is queried but no updates are applied to it. The "latest" synced instant on the table is used to read metadata information which is accurate at some point of time in the past. 
    1. Used for query side optimizations (e.g. hudi-hadoop-mr RecordReaders)
  2. Read-write mode: In this mode, the HoodieWriteClient writes updates the Metadata and keeps it in sync with the dataset. 
    1. To prevent write conflicts, only a single writer may open the Metadata Table in read-write mode. Since HUDI is single writer, this means that Metadata Table should only be opened in read-write mode through HoodieWriteClient. 

Metadata Table Reads

The metadata MOR table is made up of two types of file - the base file (HFile format) and the log files containing the delta updates. 

...

Performance: Lookups from base HFile are very fast. Delta updates are only merged once and are kept in memory. Periodic compaction will reduce the number of log file blocks to be read, merged and kept in memory. 

Design V2 (0.10.0 onwards): 

We made foundational fix here, where in all commits are first applied to metadata table followed by data table. This makes the design a lot simpler and easy to manage in the long run as it avoids lot of corner cases. When the Metadata Table is opened for reads, there could be committed instants in metadata table which was never committed to data table. So, when instantiating the reader, we read the data table timeline and filter only for those instants which are completed. 

...

Single writer with async table services: This is an interesting one. Because, as of today users don't have to configure a lock provider for this and everything works smoothly just with data table(if not for metadata table). But with metadata table, we ought to request users to configure lock services, bcoz, a regular write from data table and an async table service in data table could concurrently update metadata table. If not for locking, there could be data loss. Think about a scenario where writer1 tries to apply updates to metadata table and triggers a compaction after finishing the delta commit. Concurrently another writer also tries to apply updates to metadata table. So, there is a chance that the 2nd writer could apply delta commit to old file slice which compaction plan could have missed it. So, cater to this, we are requesting users to configure lock providers for data table if they wish to enable metadata table. 

Metadata Table Modes

The metadata table can be opened in two modes:

  1. Read-only mode: In this mode the table is queried but no updates are applied to it. As mentioned earlier, any additional commits in metadata table which was not yet complete in data table will be ignored while reading. 
    1. Used for query side optimizations (e.g. hudi-hadoop-mr RecordReaders)
  2. Read-write mode: In this mode, the HoodieWriteClient writes updates the Metadata and keeps it in sync with the dataset. 
    1. To prevent write conflicts, only a single writer may open the Metadata Table in read-write mode. 

Metadata Table Reads

The metadata MOR table is made up of two types of file - the base file (HFile format) and the log files containing the delta updates. 

...

Performance: Lookups from base HFile are very fast. Delta updates are only merged once and are kept in memory. Periodic compaction will reduce the number of log file blocks to be read, merged and kept in memory. 

Query Engine Integration

Spark

How File listing works for Hudi Spark DataSource and Spark SQL queries ?

To execute queries Spark first needs to get list of files, which are then converted to partitions/splits and passed on to the executors for tasks to perform their operations on those files. Here is how the file listing is obtained for different query types:

  • Copy on Write DataSource queries: Hudi relies on Spark’s parquet datasource internally to query the dataset since copy-on-write tables can be directly queried like parquet tables. The parquet datasource will internally perform file listing through Spark’s InMemoryFileIndex. However, to filter out only the latest commit files Hudi passes a path filter called HoodieROTablePathFilter which helps InMemoryFileIndex filter out files from the latest commit only. Spark internally invokes the listFiles method of the super class, which return files per partition directory in the PartitionDirectory format expected by Spark. The listing is parallelized across the cluster using spark context, and also maintains FileStatusCache to cache file listing across queries. The File Index also performs partition pruning which makes sure to prune out unnecessary files during query planning if partition filters are passed in the query, which helps in reducing I/O and hence query execution time. Although, for Hudi this partition pruning does not really work because Spark is not able to understand Hudi partitions and derive partition schema to be able to do pruning.
  • Merge on Read DataSource queries: Hudi has an in-built Spark Relation called MergeOnReadSnapshotRelation for querying Merge on Read tables for this case, because these tables cannot be queried directly through Spark’s parquet datasource (needs merge of log files and parquet files). However, this has been integrated to directly create and invoke InMemoryFileIndex for listing files so we are able to make use of its benefits like parallelized listing and caching out of the box.
  • Copy on Write Spark SQL queries: In this case the Hudi dataset is registered as a Hive table and can be queried using SQL via Spark. There are two scenarios here:
    • spark.sql.hive.convertMetastoreParquet = true (default): In this scenario, Spark SQL queries go through Spark’s parquet datasource implementation because Spark identifies it as a parquet table based on its registered parquet serde in the Hive table created in the metastore, and uses it to query. Again, customers need to pass the path filter i.e. HoodieROTablePathFilter for Spark to filter out the latest commit files when performing the listing via InMemoryFileIndex.
    • spark.sql.hive.convertMetastoreParquet = false: In this scenario, Spark SQL queries go through Hive InputFormat implementation for Hudi COW tables i.e. HoodieParquetInputFormat. This is directly able to use Hudi’s filesystem view to get latest commit files without needing path filter. But Hive is not able to parallelize the listing across cluster nodes, and at most can do multi-threaded listing at the Hive driver and is slower than spark’s file listing.
  • Merge on Read Spark SQL queries: To query Merge on Read tables via Spark SQL customers need to use spark.sql.hive.convertMetastoreParquet = false because Merge on Read tables cannot be directly queried using Spark’s parquet datasource (because of merging of log and parquet files needed). These queries go through Hudi’s Hive InputFormat implementation for merge on read tables capable of doing the merge i.e. HoodieRealtimeInputFormat. Hence, the file listing also happens through Hive for this case, instead of through spark.
  • Bootstrapped Copy on Write DataSource queries: In case of DataSource based queries, the bootstrapped COW tables are queried using a custom Spark Relation i.e. HoodieBootstrapRelation which performs listing by directly creating Spark’s InMemoryFileIndex.
  • Bootstrapped Copy on Write Spark SQL queries: In case of Spark SQL, the bootstrapped COW tables are queried using the HoodieParquetInputFormat implementation via Hive.
  • Copy on Write Incremental queries: In this case listing happens by reading the commit metadata files for commits touched by the incremental query, since we need only the files updated in those commits.
  • Merge on Read Incremental queries: In this case listing happens by reading the commit metadata files for commits touched by the incremental query, since we need only the files updated in those commits.

Proposal

Proposal For V1 (Hudi 0.7.0)

Modify HoodieROTablePathFilter so that it can use Metadata table to list files under the parent folder of the path passed to the filter, so that it can help determine if the file belongs to latest commit. So while the file listing of the root path still happens internally withing Spark through InMemoryFileIndex, there will be some performance benefit by making the listing that is happening in the PathFilter go through Metadata table (since we have control over the path filter in Hudi code but not over InMemoryFileIndex). This will benefit the following cases:

  • Read Optimized queries via Spark DataSource: These queries are directed to Spark's Parquet DataSource which internally does file listing using its InMemoryFileIndex. However, Hudi needs to pass the HoodieROTablePathFilter here while directing the query to DataSource so that InMemoryFileIndex can internally use the Path Filter to filter out the latest commit files. Since we are making the path filter more performant with the metadata these queries will benefit.
  • Read Optimized queries Spark SQL: Here is the guide to query via Spark SQL which details that for Spark SQL the queries go through Hive InputFormat implementations that Hudi has. However for read optimized queries it can be made to go through Spark's parquet datasource as it just involves reading parquet files, but letting spark.sql.hive.convertMetastoreParquet be true (default). However, user needs to pass  HoodieROTablePathFilter so Spark can filter out the latest commit files out of all the files. Since we are making the path filter more performant with the metadata these queries will benefit.
Proposal For V2 (Hudi 0.9.0)

We need a way to hook a custom File Listing implementation for Hudi, so that we can have Spark list the root and partition paths using Metadata Table in the first place. If metadata is not enabled, it should fall back to listing in a distributed way using spark context. The proposal is to implement a custom Spark's FileIndex for Hudi DataSource which provides a good integration point for implementing our own listing solution. At a high level this is how we can use FileIndex APIs for implementing this:

...

  • One centralized file listing solution for all query types via Spark DataSource or via Spark SQL
  • Best efficiency in both cases:
    • Metadata enabled => O(1) reads of metadata table for N partitions and avoids O(N) list calls
    • Metadata disabled => Spark context to distribute listing across the cluster
  • It will vastly improve Spark SQL query performance as they also be able to use Hudi DataSource for queries and make use of Spark's native implementations and optimizations which are far more performant than Hive readers.
  • Custom FileIndex be used to support partition pruning for Spark DataSource queries which currently does not work.
  • File Index can later be extended for other features like pruning based on column range index (column statistics), time travel queries etc.

Hive

How File listing works for Hudi Hive queries ?

Proposal

Presto

How File listing works for Hudi Presto queries ?

Here is how Hudi queries work for PrestoDb:

  • Read optimized queries: These queries only require reading of parquet files and hence are read using Presto's Hive connector with its native parquet readers to be able to use all the optimizations. The split generation and file listing are also handled using Presto's native logic. For Presto to be able to filter out the latest commit files after listing, it uses HoodieROTablePathFilter here. This is similar to Spark's file listing in case of read optimized queries where it needs the path filter.
  • Real time queries: These queries require reading of parquet and log files to provide the complete snapshot. It is supported using Hudi's InputFormat for real time tables i.e. HoodieRealtimeInputFormat. This allows Presto to re-use the InputFormat implementation without having to implement native read support, although it is not as efficient as Presto's native parquet readers have several optimizations. The split generation and file listing also happens via the InputFormat by calling the getSplits on InputFormat here.

Proposal

Proposal for Read optimized queries

Approach 1: Support via HoodieParquetInputFormat

...

Overall, I am more in favor of Approach 2 because we are able to maintain Presto's native optimizations and also read metadata just once for N partitions. This would offer the best performance enhancements.

Proposal for Real time queries

Support via HoodieRealtimeInputFormat

...

In Future we probably want to implement native real time splits and reading support within Presto for Hudi. If we move towards that, then the same DirectoryLister can be used to get file listing. However, native readers implementation for real time queries is out of scope for this feature.

Error Handling for Design V1 (pre 0.10.0):

Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly. Two types of errors need to be handled:

...

  1. When an async compaction is scheduled on the dataset, it generates a CompactionPlan which is written to the compaction.requested instant. New base files are created at the end of the compaction process but they are not visible on the dataset view until the compaction process completes.
  2. Metadata Table is updated before the compaction process completes. The log writer already supports multi-writer so we do not have a problem with async / parallel updates to the log.
  3. Any active Metadata reader may not see the background compaction complete operation until next refresh. This should not have any data consistency issues as the whole File Slice design ensures that data is still visible through the older versions of the base / log files. 

Error Handling for Design V2 (0.10.0 onwards):

Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly. Two types of errors need to be handled:

...

  1. When an async clean is scheduled on the dataset, it generates a CleanPlan which is written to the clean.requested instant
  2. Async clean will delete the files in the background and complete the clean operation
  3. Just before committing to data table, the commit will get applied to metadata table and data should be intact. Readers will ensure to skip this if it failed before getting applied to data table. 

Query handling (read path)

From a reader/query engines perspective, Inputformat’s “listStatus” requests will be served with the current view of the file system without the expensive distributed file system queries.

...

  • Hudi will inspect the master manifest file to prune all partitions that don’t match the query range (for example, prune all partitions that don’t satisfy (x1 < C1 < y1)).   Apply the second and subsequent parts of the query to prune the partitions list further.
  • Once list of “candidate partitions” have been identified, Hudi will inspect the corresponding partition manifest files, to prune the data files that don’t match the query range to generate “candidate data files” where the data is most likely to reside. 
  • For most frequently used queries (learned over time for each dataset), Hudi could optimize further my maintaining a list of “hot columns” and maintaining column specific metadata (like range/ index files).

Rollout/Adoption Plan

  • What impact (if any) will there be on existing users?
    • None. This functionality will be add-on to existing datasets. If the metadata or indexes are not available, the functionality will not be enabled. 
  • If we are changing behavior how will we phase out the older behavior?
    • This new feature is backwards-compatible. If the metadata or indexes are not available, the existing way of listing files through the NameNode will be used.

  • If we need special migration tools, describe them here.
    • NA

  • When will we remove the existing behavior?
    • Is not required.

Test Plan

Extensive unit tests will test every aspect of the metadata implementation. 

How to test using HUDI CLI

HUDI-CLI has been updated to included the following metadata related command. These command require the HUD-CLI to be connected to the dataset (connect --path /path/to/hudi-dataset)

...

metadata list-partitions  and metadata list-files  can be used for performance measurements. If these commands are issued before metadata table is created, they show the time taken using file listings. When issued after metadata table is created, they show the time for lookups from the metadata table.


Metadata Table Metrics

The following metrics have been added for the metadata 

...