Proposers

Approvers

Status

Current state: ACCEPTED

Discussion thread: here

JIRA: here

Released: 0.7

Abstract

In the current implementation, HUDI Writer Client (in the write path) and HUDI queries (through Inputformat in the read path) have to perform a “list files” operation on the file system to get the current view of the file system.  In HDFS, listing all the files in the dataset is a NameNode intensive operation for large data sets. For example, one of our HUDI dataset has about thousands of date partitions with each partition having thousands of data files.

With this effort, we want to:

  1. Eliminate the requirement of “list files” operation
    1. This will be done by proactively maintaining metadata about the list of files
    2. Reading the file list from a single file should be faster than large number of NameNode operations
  2. Create Column Indexes for better query planning and faster lookups by Readers
    1. For a column in the dataset, min/max range per Parquet file can be maintained.
    2. Just by reading this index file, the query planning system should be able to get the view of potential Parquet files for a range query.  
    3. Reading Column information from an index file should be faster than reading the individual Parquet Footers.

This should provide the following benefits:

  1. Reducing the number of file listing operations improves NameNode scalability and reduces NameNode burden.
  2. Query Planner is optimized as the planning is done by reading 1 metadata file and is mostly bounded regardless of the size of the dataset 
  3. Can allow for performing partition path agnostic queries in a performant way


In this document we provide two approaches on how to implement this functionality. We intend to finalize to a single approach after discussions and some workflow testing on sample dataset. 

Implementation

Basics

RFC-15 has been implemented by using an internal HUDI MOR Table to store the required metadata for the dataset. This table will be internal to a dataset and will not be exposed directly to the user to write / modify.

The Metadata Table has the following features:

  1. Will be located in the .hoodie/metadata directory
  2. Is a MOR Table with HFile base / log file formats
    1. HFile format has been chosen as it allows point lookups of specific records based on record key
  3. Defines its own custom payload HoodieMetadataPayload
  4. Does not use date based partitioning scheme
  5. Each partition of this table is reserved for various types of metadata information. E.g.
    1. "files" partition saves fil-listing information
    2. "record_index" saves a record-key-level index
    3. etc.
  6. Follows all the conventions of a HUDI Table (has a schema, can be queried using Hive/Spark, can be cleaned, compacted)


In the reset of this section, “dataset” refers to the Hoodie dataset which holds the data and “Metadata Table” refers to this internal table holding the file listing metadata.


Schema

Being an actual Hoodie Table, the Metadata Table needs to have a schema. The schema provides a way to save the type of the record (enum) and a filename to size mapping.

Hoodie only allows a single Schema for the entire dataset. But we want to save different types of information in the various partitions of the Metadata Table. Hence, the below schema is being used which contains an integer "type" representing the type of the record and an appropriate record which contains the information. The record key will also be chosen based on the type of information being stored.

{

    "namespace": "org.apache.hudi.avro.model",
    "type": "record",
    "name": "HoodieMetadataRecord",
    "doc": "A record saved within the Metadata Table",
    "fields": [
        {   "name": "key", "type": "string" },
        {   "name": "type", doc": "Type of the metadata record", "type": "int"},

        {   "name": "filesystemMetadata", "doc": "Contains information about partitions and files within the dataset", "type": ["null", {

               "type": "map",
               "values": {
                    "type": "record",
                    "name": "HoodieMetadataFileInfo",
                    "fields": [
                        { "name": "size", "type": "long", "doc": "Size of the file"},
                        { "name": "isDeleted", "type": "boolean",  "doc": "True if this file has been deleted" }
                    ]
                }
            }]
        }
    ]
}


For file listing information, the Schema has a record field named filesystemMetadata which allows saving the list of files

  1. To save the list of all partitions, we save a record whose key = "_all_partitions_", type="1", and whose filesystemMetadata contains the list of partition names
  2. To save the list of files in a partition, we save a record whose key = <name-of-partition>, type="2", and whose filesystemMetadata contains the list of file names present in the partition with their sizes.

Since files can be added as well as deleted, we use the isDeleted field to distinguish that. During a CLEAN, when files are deleted, we save a record of type=2 with isDeleted=True for each deleted file. During compaction, all the records with the same key are merged and the information is normalized (HoodieMetadataPayload.combineAndGetInsertValue) by removing the deleted files. Hence, eventually the record sizes are only proportional to the number of files in the partition. This also proves that the size of the metadata table will eventually be proportional to the number of files and partitions and not to the number of operations on the dataset.


Example of how the records are added:

  1. Insert into partition 2020/01/01 and 1 parquet file created
    1. Insert a new record {"key": "2020/01/01", "type": 2, "filenameToSizeMap": {"file10.parquet": 12345, isDeleted: false} }
    2. Insert a new partition record {"key": "_all_partitions_", "type": 1, "filenameToSizeMap": "2020/01/01": 0, isDeleted: false} } 
  2. Insert into partition 2020/01/02 and 1 parquet file created
    1. Insert a new record {"key": "2020/01/02", "type": 2, "filenameToSizeMap": {"file20.parquet": 65432, isDeleted: false} }
    2. Insert a new partition record {"key": "_all_partitions_", "type": 1, "filenameToSizeMap": "2020/01/02": 0, isDeleted: false} } 
  3. Upsert into the two partitions and some records updated. For COW table this will create a new version of parquet file, for MOR table this will write to a LOG file.
    1. Insert a new record {"key": "2020/01/01", "type": 2, "filenameToSizeMap": {"file11.parquet": 12345, "file11.log": 5678, isDeleted: false} }
    2. Insert a new record {"key": "2020/01/02", "type": 2, "filenameToSizeMap": {"file21.parquet": 12345, "file21.log": 5678, isDeleted: false} }
    3. Insert a new partition record {"key": "_all_partitions_", "type": 1, "filenameToSizeMap": {"2020/01/01": 0, "2020/01/02", isDeleted: false}} 

When the Metadata Table will be Compacted, the various records with the same keys will be compacted to single records. With a CLEAN older version will be removed.

Metadata Table partitioning scheme 

Each type of information will be stored in its own partition. The partition names will be hardcoded and chosen in advance. Within each partition, the number of base-files will be chosen based on the amount of information that needs to be committed. 

For saving file-listing information, we have chosen the partition name "files". Even for the largest of datasets (1000s of partitions and millions of files), we expect the size of the encoded/compressed metadata information to be in 100s of MBs and not in GBs. Hence, we assume that the entire file listing metadata information can fit within a single <1GB base file. Having a single base file is also more performant. The base file always needs to be read while querying the Metadata Table so the lesser the number of base files, the faster the reads (HFile format has some fixed overhead when opening a file). 

The Metadata Table has the following hard coded partitions:

  1. files: The partition which saves file listing information in a single base file
  2. column_xxxxx_partition: (Future Extensibility) For each column being indexed, a specific partition will be created. Keeping the column index information in a separate partition allows these benefits:

Having well known partitions also simplifies the indexing and tagging of Metadata Table updates and makes the operations faster.  No external index or Bloom Index is required. Further benefits:

  1. Individual partition can be onboarded and replaced without affecting other types of information
  2. Features can be onboarded in different versions (e.g. file-listing enhancements in 0.7 and column-indexes in 0.8)
  3. Specific features can be dropped within affecting file listing performance or large IO to delete records (delete the partition)
  4. Can be re-indexed to deal with bugs / updates without affecting file listing performance (replace partition data)

First time initialization

Use of metadata for existing and new Hoodie datasets can be enabled using the HoodieWriteConfig supplied to the HoodieWriteClient.

HoodieWriterConfig.newBuilder()
    // Provide a Metadata Config
    .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).validate(true).build())
    ...


For an existing dataset, first time initialization will use recursive file listing within all the partitions to initialize the Metadata Table. For a new dataset, the Metadata Table is initialized but is empty.

When metadata verification (validate) is enabled:

  1. File listings are still used to get list of partition and files within the partition
  2. Metadata Table is updated as well as queried for the partitions and files within the partition
  3. Result of #1 and #2 are compared for accuracy
  4. Results from file listing are returned (data from metadata table is ONLY used for verification purpose)

This can be used during the testing period to ensure there are no bugs in the metadata implementation in production. 

Metadata updates and sync

All changes on the dataset (insert / upsert / compaction / etc.) cause changes to the data/log files stored. Hence, for each such change, the metadata stored in the Metadata Table needs to be updated. Changes need to be kept in sync for correct metadata to be always reported. This is the classic case of “transactions on multiple tables” or multi-table-commit.

A change on the dataset is marked using a Hoodie Instant which has three stages - Requested, Inflight and Completed. A change is considered completed only when the Hoodie Instant has been transitioned to the Completed stage. The general idea of updating the metadata is to apply the changes to the Metadata Table right before the transition of Hoodie Instant to the Completed stage (error handling is covered in a later section). All information is derived directly from the HoodieXXXMetadata (HoodieCommitMetadata, HoodieRestoreMetadata, etc) which is generated as part of the change itself and saved within the Instant files. Hence, no more file listing is required to update the Metadata Table.

The following table details how the various operations are handled.


Operation on dataset

Source of information (Instant)

Operation on Metadata Table

Insert

HoodieCommitMetadata

DeltaCommit

Upsert

HoodieCommitMetadata

DeltaCommit

Compact

HoodieCommitMetadata

DeltaCommit

Clean

HoodieCleanMetadata

DeltaCommit

Restore

HoodieRestoreMetadata

DeltaCommit


Metadata Table is kept in sync with the dataset by reading the completed instants, processing the changes (files added, files deleted), converting this information into HoodieMetadataPayload records and finally commits this information into the Metadata Table. The instantTime on the metadata table is chosen to be the same as that on the dataset. This helps in debugging to reason about difference between data timeline and metadata timeline. 


Operation on dataset

Instant on dataset Timeline

Corresponding Operation on Metadata Table

Instant on Metadata Table Timeline

Insert

20201122.commit

Deltacommit

20201122.deltacommit

Upsert

20201123.commit

Deltacommit

20201123.deltacommit

Compact

20201124.commit

Deltacommit

20201124.deltacommit

Clean

20201125.clean

Deltacommit

20201125.deltacommit

Restore

20201126.restore

Deltacommit

20201126.deltacommit

Metadata Table may have additional instants which do not occur on the dataset table. This is because the compaction and clean schedules of Metadata Table are independent of  those of the dataset.

Design V1 (pre 0.10.0):

Asynchronous updates to Metadata table: Until 0.10.0, we had asynchronous way of applying updates to metadata table. When the Metadata Table is opened for reads, there could be more completed instants in data table which was not yet synced to metadata table. We need to ensure that it is in sync with the dataset. This is done as follows:

  1. List all the completed instants on the dataset
  2. Do all instant times have corresponding deltacommit instant on Metadata Timeline?
    1. Yes: Then the Metadata Table is in sync
    2. No: Metadata Table is out of sync

Metadata Table can be synced by reading the Instants from the dataset Timeline as using their metadata to apply the updates on the Metadata Table. 

Handling Async Operations

HUDI allows some asynchronous operations (e.g. Compaction and Clean) to be scheduled in parallel with other operations like Commit. The Metadata Table cannot be synced with the results of the asynchronous operations before they complete. Furthermore, to ensure consistency, we cannot sync the completed instants which occur after the inflight asynchronous operations.

To deal with this issue, when asynchronous operations are detected on the dataset:

  1. Metadata Table will not be synced any further
  2. All the un-synched but completed instants are read and merged in memory
  3. The in-memory information combined with the already committed information provides the correct and latest view of the file-listing information. 

The in-memory merging is performed until the asynchronous operations are not completed. This ensures that metadata table is always consistent and readers can get the latest view.

Metadata Table Modes

The metadata table can be opened in two modes:

  1. Read-only mode: In this mode the table is queried but no updates are applied to it. The "latest" synced instant on the table is used to read metadata information which is accurate at some point of time in the past. 
    1. Used for query side optimizations (e.g. hudi-hadoop-mr RecordReaders)
  2. Read-write mode: In this mode, the HoodieWriteClient writes updates the Metadata and keeps it in sync with the dataset. 
    1. To prevent write conflicts, only a single writer may open the Metadata Table in read-write mode. Since HUDI is single writer, this means that Metadata Table should only be opened in read-write mode through HoodieWriteClient. 

Metadata Table Reads

The metadata MOR table is made up of two types of file - the base file (HFile format) and the log files containing the delta updates. 

Suppose we want to read the list of partition from the Metadata Table. This involves the following steps:

  1. Open the Metadata Table and find the latest synced instant
  2. Find the latest File Slice in the "files" partition. 
  3. Create a HoodieMergedLogRecordScanner over the log files of the latest slice. This reads all updates and merges them in memory.
  4. Determine the key to be read - in this case we need the list of partitions which is saved  with the key __all_partitions_. 
  5. Read the key from the base file (if present). This is a point lookup in the base HFile and returns a HoodieRecordPayload if the key was found.
  6. Read the key from the HoodieMergedLogRecordScanner. This is a in-memory lookup in the merged records and returns a HoodieRecordPayload if the key was found.
  7. Merge the two payloads found in #6 and #7. Either or both of them may be null.

Performance: Lookups from base HFile are very fast. Delta updates are only merged once and are kept in memory. Periodic compaction will reduce the number of log file blocks to be read, merged and kept in memory. 

Design V2 (0.10.0 onwards): 

We made foundational fix here, where in all commits are first applied to metadata table followed by data table. This makes the design a lot simpler and easy to manage in the long run as it avoids lot of corner cases. When the Metadata Table is opened for reads, there could be committed instants in metadata table which was never committed to data table. So, when instantiating the reader, we read the data table timeline and filter only for those instants which are completed. 

In this design, all writers and table services will apply the changes to metadata table and will commit to data table. So, there is no catch up required. But with synchronous updates, we need to solve how concurrent writers from data table could apply changes to metadata without any issues. 

Single writer: With a single writer use-case, with all inline services, there won't be any concurrent operations which tries to update metadata simultaneously. 

Strictly multi-writer: with multi-writer scenarios, users would have configured lock providers and so any write to metadata table is guarded by the data table lock. We ensure only one write will get applied to metadata at any point in time. 

Single writer with async table services: This is an interesting one. Because, as of today users don't have to configure a lock provider for this and everything works smoothly just with data table(if not for metadata table). But with metadata table, we ought to request users to configure lock services, bcoz, a regular write from data table and an async table service in data table could concurrently update metadata table. If not for locking, there could be data loss. Think about a scenario where writer1 tries to apply updates to metadata table and triggers a compaction after finishing the delta commit. Concurrently another writer also tries to apply updates to metadata table. So, there is a chance that the 2nd writer could apply delta commit to old file slice which compaction plan could have missed it. So, cater to this, we are requesting users to configure lock providers for data table if they wish to enable metadata table. 

Metadata Table Modes

The metadata table can be opened in two modes:

  1. Read-only mode: In this mode the table is queried but no updates are applied to it. As mentioned earlier, any additional commits in metadata table which was not yet complete in data table will be ignored while reading. 
    1. Used for query side optimizations (e.g. hudi-hadoop-mr RecordReaders)
  2. Read-write mode: In this mode, the HoodieWriteClient writes updates the Metadata and keeps it in sync with the dataset. 
    1. To prevent write conflicts, only a single writer may open the Metadata Table in read-write mode. 

Metadata Table Reads

The metadata MOR table is made up of two types of file - the base file (HFile format) and the log files containing the delta updates. 

Suppose we want to read the list of partition from the Metadata Table. This involves the following steps:

  1. Open the Metadata Table and find the latest File Slice in the "files" partition. 
  2. Create a HoodieMergedLogRecordScanner over the log files of the latest slice by filtering for valid (completed) commits from data timeline. This reads all updates and merges them in memory.
  3. Determine the key to be read - in this case we need the list of partitions which is saved  with the key __all_partitions_. 
  4. Read the key from the base file (if present). This is a point lookup in the base HFile and returns a HoodieRecordPayload if the key was found.
  5. Read the key from the HoodieMergedLogRecordScanner. This is a in-memory lookup in the merged records and returns a HoodieRecordPayload if the key was found.
  6. Merge the two payloads found in #6 and #7. Either or both of them may be null.

Performance: Lookups from base HFile are very fast. Delta updates are only merged once and are kept in memory. Periodic compaction will reduce the number of log file blocks to be read, merged and kept in memory. 

Query Engine Integration

Spark

How File listing works for Hudi Spark DataSource and Spark SQL queries ?

To execute queries Spark first needs to get list of files, which are then converted to partitions/splits and passed on to the executors for tasks to perform their operations on those files. Here is how the file listing is obtained for different query types:

  • Copy on Write DataSource queries: Hudi relies on Spark’s parquet datasource internally to query the dataset since copy-on-write tables can be directly queried like parquet tables. The parquet datasource will internally perform file listing through Spark’s InMemoryFileIndex. However, to filter out only the latest commit files Hudi passes a path filter called HoodieROTablePathFilter which helps InMemoryFileIndex filter out files from the latest commit only. Spark internally invokes the listFiles method of the super class, which return files per partition directory in the PartitionDirectory format expected by Spark. The listing is parallelized across the cluster using spark context, and also maintains FileStatusCache to cache file listing across queries. The File Index also performs partition pruning which makes sure to prune out unnecessary files during query planning if partition filters are passed in the query, which helps in reducing I/O and hence query execution time. Although, for Hudi this partition pruning does not really work because Spark is not able to understand Hudi partitions and derive partition schema to be able to do pruning.
  • Merge on Read DataSource queries: Hudi has an in-built Spark Relation called MergeOnReadSnapshotRelation for querying Merge on Read tables for this case, because these tables cannot be queried directly through Spark’s parquet datasource (needs merge of log files and parquet files). However, this has been integrated to directly create and invoke InMemoryFileIndex for listing files so we are able to make use of its benefits like parallelized listing and caching out of the box.
  • Copy on Write Spark SQL queries: In this case the Hudi dataset is registered as a Hive table and can be queried using SQL via Spark. There are two scenarios here:
    • spark.sql.hive.convertMetastoreParquet = true (default): In this scenario, Spark SQL queries go through Spark’s parquet datasource implementation because Spark identifies it as a parquet table based on its registered parquet serde in the Hive table created in the metastore, and uses it to query. Again, customers need to pass the path filter i.e. HoodieROTablePathFilter for Spark to filter out the latest commit files when performing the listing via InMemoryFileIndex.
    • spark.sql.hive.convertMetastoreParquet = false: In this scenario, Spark SQL queries go through Hive InputFormat implementation for Hudi COW tables i.e. HoodieParquetInputFormat. This is directly able to use Hudi’s filesystem view to get latest commit files without needing path filter. But Hive is not able to parallelize the listing across cluster nodes, and at most can do multi-threaded listing at the Hive driver and is slower than spark’s file listing.
  • Merge on Read Spark SQL queries: To query Merge on Read tables via Spark SQL customers need to use spark.sql.hive.convertMetastoreParquet = false because Merge on Read tables cannot be directly queried using Spark’s parquet datasource (because of merging of log and parquet files needed). These queries go through Hudi’s Hive InputFormat implementation for merge on read tables capable of doing the merge i.e. HoodieRealtimeInputFormat. Hence, the file listing also happens through Hive for this case, instead of through spark.
  • Bootstrapped Copy on Write DataSource queries: In case of DataSource based queries, the bootstrapped COW tables are queried using a custom Spark Relation i.e. HoodieBootstrapRelation which performs listing by directly creating Spark’s InMemoryFileIndex.
  • Bootstrapped Copy on Write Spark SQL queries: In case of Spark SQL, the bootstrapped COW tables are queried using the HoodieParquetInputFormat implementation via Hive.
  • Copy on Write Incremental queries: In this case listing happens by reading the commit metadata files for commits touched by the incremental query, since we need only the files updated in those commits.
  • Merge on Read Incremental queries: In this case listing happens by reading the commit metadata files for commits touched by the incremental query, since we need only the files updated in those commits.

Proposal

Proposal For V1 (Hudi 0.7.0)

Modify HoodieROTablePathFilter so that it can use Metadata table to list files under the parent folder of the path passed to the filter, so that it can help determine if the file belongs to latest commit. So while the file listing of the root path still happens internally withing Spark through InMemoryFileIndex, there will be some performance benefit by making the listing that is happening in the PathFilter go through Metadata table (since we have control over the path filter in Hudi code but not over InMemoryFileIndex). This will benefit the following cases:

  • Read Optimized queries via Spark DataSource: These queries are directed to Spark's Parquet DataSource which internally does file listing using its InMemoryFileIndex. However, Hudi needs to pass the HoodieROTablePathFilter here while directing the query to DataSource so that InMemoryFileIndex can internally use the Path Filter to filter out the latest commit files. Since we are making the path filter more performant with the metadata these queries will benefit.
  • Read Optimized queries Spark SQL: Here is the guide to query via Spark SQL which details that for Spark SQL the queries go through Hive InputFormat implementations that Hudi has. However for read optimized queries it can be made to go through Spark's parquet datasource as it just involves reading parquet files, but letting spark.sql.hive.convertMetastoreParquet be true (default). However, user needs to pass  HoodieROTablePathFilter so Spark can filter out the latest commit files out of all the files. Since we are making the path filter more performant with the metadata these queries will benefit.
Proposal For V2 (Hudi 0.9.0)

We need a way to hook a custom File Listing implementation for Hudi, so that we can have Spark list the root and partition paths using Metadata Table in the first place. If metadata is not enabled, it should fall back to listing in a distributed way using spark context. The proposal is to implement a custom Spark's FileIndex for Hudi DataSource which provides a good integration point for implementing our own listing solution. At a high level this is how we can use FileIndex APIs for implementing this:


class HoodieFileIndex(spark: SparkSession, fileStatusCache: FileStatusCache)
       extends FileIndex {

 override def refresh(): Unit = {
       // This is called internally by Spark every time a Spark relation is created
       // or if Spark's FileStatusCache is expired.
       //
       // Perform the file listing of the root path here:
       // 1. If file listing is already present in FileStatusCache return it
       // 2. If metadata is enabled list using metadata listing
       // 3. If metadata is not enabled list using spark context in a distributed
       // way
       // 4. Build the Hudi FileSystemView and filter out latest commit files
       // 5. Store file listing in Spark's FileStatusCache
  }

  override def rootPaths: Seq[Path] = {
       // returns the base path of the table or a sub path (if user has
       // specified one in the query)
  }

  override def listFiles(partitionFilters, dataFilters) = {
       // 1. Perform partition pruning
       // 2. Return latest commit files from the FileSystemView for the filtered
       // out partitions.
  }
}

Additional details for integrating Spark DataSource queries with FileIndex:

  • For several query types like real time queries, bootstrap queries which directly rely on Spark's InMemoryFileIndex (in their Spark Relation implementations) to perform the file listing we can switch to directly using the custom FileIndex implementation.
  • For read optimized queries these queries are directed to Spark's Parquet DataSource which internally does file listing using the same InMemoryFileIndex. This we need a way to make Spark's Parquet DataSource be able to use the custom FileIndex implementation. The proposal is to create Parquet DataSource via HadoopFsRelation instead which provides a pass a FileIndex implementation:

                  HadoopFsRelation(

                                  // custom file index,

                                  // partition schema,

                                  // data schema,

                                  None,

                                  new ParquetFileFormat,

                                  optParams)(sqlContext.sparkSession)

This proposal work's well for Spark DataSource based queries which go via Hudi DataSource implementation and will use the FileIndex. However this is not the case for Spark SQL which has the following caveats:

  • Read Optimized: Here is the guide to query via Spark SQL which details that for Spark SQL the queries go through Hive InputFormat implementations that Hudi has. However for read optimized queries it can be made to go through Spark's parquet datasource as it just involves reading parquet files, but letting spark.sql.hive.convertMetastoreParquet be true (default). However, user needs to pass  HoodieROTablePathFilter so Spark can filter out the latest commit files out of all the files.
  • Real Time: These queries are executed via HoodieRealtimeInputFormat and users need to set spark.sql.hive.convertMetastoreParquet be false as mentioned in the guide.
  • Bootstrap: The bootstrap queries also go through HoodieParuqetInputFormat and HoodieRealtimeInputFormat for bootstrapped COW and MOR tables.

However, now since we have native Spark Relation implementations for MOR Real Time queries and Bootstrap queries all of Spark SQL queries can ideally be made to go through Hudi DataSource itself instead of relying on InputFormat. This will help improve Spark SQL query performance both through listing which can happen via FileIndex and also query execution as it will be able to use Spark's native readers and built in optimizations which offer vast improvement over Hive readers. To make Spark SQL queries use Hudi DataSource the Hudi tables need to be registered as DataSource Tables in the metastore, so that Spark can internally understand that it needs to use the DataSource for querying and not the Serde/Inputformat registered for the table. This can be done by registering the following table properties in the metastore when doing Hive Sync for Hudi tables:

  • spark.sql.sources.provider => hudi (This would make Spark use Hudi DataSource implementation for the query)
  • spark.sql.sources.schema.part.0...N => The data schema broken into parts
  • spark.sql.sources.schema.numParts => N (Number of parts the schema is broken into)
  • spark.sql.sources.schema.numPartCols => Number of partition columns
  • spark.sql.sources.schema.partCol.0...N => The partition column field names (0 to N for N partition columns)

With the above, all of Spark SQL queries will use Hudi DataSource and hence end up using the custom FileIndex for doing the listing. Although InputFormat can also do listing with metadata table because of Hive integration, but that listing is not as efficient via SparkSQL because Spark instantiates a new InputFormat instance for each partition and does listing at different executors for each partition. This would end up reading the metadata O(N) times for N partitions and hence would not offer that much performance benefit. However by integrating it with FileIndex, it will perform O(1) reads of the metadata table for N partitions and offer much better performance. Also, even in the fallback case the listing happens using Spark Context and is distributed across the cluster whereas in Hive the listing happens only at the Driver. Thus, in either case whether Metadata enabled or Not this is the best way forward.

To summarize the advantages of this approach:

  • One centralized file listing solution for all query types via Spark DataSource or via Spark SQL
  • Best efficiency in both cases:
    • Metadata enabled => O(1) reads of metadata table for N partitions and avoids O(N) list calls
    • Metadata disabled => Spark context to distribute listing across the cluster
  • It will vastly improve Spark SQL query performance as they also be able to use Hudi DataSource for queries and make use of Spark's native implementations and optimizations which are far more performant than Hive readers.
  • Custom FileIndex be used to support partition pruning for Spark DataSource queries which currently does not work.
  • File Index can later be extended for other features like pruning based on column range index (column statistics), time travel queries etc.

Hive

How File listing works for Hudi Hive queries ?

Proposal

Presto

How File listing works for Hudi Presto queries ?

Here is how Hudi queries work for PrestoDb:

  • Read optimized queries: These queries only require reading of parquet files and hence are read using Presto's Hive connector with its native parquet readers to be able to use all the optimizations. The split generation and file listing are also handled using Presto's native logic. For Presto to be able to filter out the latest commit files after listing, it uses HoodieROTablePathFilter here. This is similar to Spark's file listing in case of read optimized queries where it needs the path filter.
  • Real time queries: These queries require reading of parquet and log files to provide the complete snapshot. It is supported using Hudi's InputFormat for real time tables i.e. HoodieRealtimeInputFormat. This allows Presto to re-use the InputFormat implementation without having to implement native read support, although it is not as efficient as Presto's native parquet readers have several optimizations. The split generation and file listing also happens via the InputFormat by calling the getSplits on InputFormat here.

Proposal

Proposal for Read optimized queries

Approach 1: Support via HoodieParquetInputFormat

We switch to using Hudi's InputFormat split generation logic for read optimized tables i.e. HoodieParquetInputFormat, instead of using Presto's native split generation logic (current approach). This would similar to how real time queries work for presto via HoodieRealtimeInputFormat where it uses the split generation and inherently its file listing logic. Thus, if HoodieParquetInputFormat is integrated to be able to list using metadata tables, it would work via Presto as well and we won't have to implement anything specific for Presto.

Advantages:

  • Able to re-use the InputFormat integration with metadata listing without implementing anything native for Presto.
  • Decouples Hudi code from Presto code based, as we don't have to write more Hudi specific integration code within Presto Hive connector (although its not that big an advantage since Presto community has already accepted Hudi as a direct dependency and there is already Hudi specific code in Presto to use Hudi's PathFilter and InputFormat for splits generation and record reading in case of bootstrap and real time queries).

Disadvantages:

  • Using InputFormat's split generation via Presto has the overheads of serialization/de-serialization from InputFormat splits to Presto's native splits and back.
  • Presto's native file listing is more optimized than when InputFormat is used via Presto. This is because of the way Presto invokes InputFormat, where it calls getSplits API individually on each partition in separate threads. It also creates separate instances of InputFormat in each thread. This would result in metadata table being read N times for N partitions because of separate InputFormat instances, when ideally it could be read just once to get list for all partitions.

Approach 2: Implement a DirectoryLister for Hudi within Presto

Presto has a DirectoryLister interface, and has an implementation HadoopDirectoryLister which it uses to list files via hadoop FileSystem APIs. This DirectoryLister is invoked for each partition to do the listing. We can implement a custom DirectoryLister for Hudi, where we can implement our own custom logic that does the following:

  • If metadata listing is enabled, get the file listing for the partition using metadata table.
  • If metadata is not enabled, get the file listing using FileSystem API.
  • Filter out the latest commit files using Hudi's FileSystemView APIs.

The DirectoryLister is called to list for every partition, but there is just once instance of the DirectoryLister. This allows us to control that the metadata table reader is opened just once, when the DirectoryLister instance is created. Individual calls for partitions use the same reader to do seeks on the HFile to get list for those partitions. This reduces it to O(1) reads for N partitions and uses the metadata efficiently.

Advantages:

  • The metadata table will be used efficiently and read just once. It reduces O(N) LIST calls to O(1) GET calls for N partitions.
  • It even further optimizes the filtering of latest commit files by getting rid of the PathFilter which is applied to each and every file and has some cost because of the internal listing and initializations it does. Instead, we directly use Hudi's FileSystemView to get the latest commit files in just one call.

Disadvantages:

  • Additional Hudi specific code in Presto. But this seems okay because Hudi is already tightly coupled with Presto. Presto already has adopted Hudi as a direct dependency.

Overall, I am more in favor of Approach 2 because we are able to maintain Presto's native optimizations and also read metadata just once for N partitions. This would offer the best performance enhancements.

Proposal for Real time queries

Support via HoodieRealtimeInputFormat

We continue with existing approach for real time query support of using HoodieRealtimeInputFormat for split generation which internally does the file listing. Thus, HoodieRealtimeInputFormat's integration to work with metadata tables would work out of the box for Presto without having to implement anything native to Presto.

We cannot use the Presto's native listing or DirectoryLister here because Presto depends upon HoodieRealtimeInputFormat to generate HoodieRealTimeFileSplit's which need the additional information about log files (apart from the base parquet file). Presto does not have native logic to generate these type of file splits and we re-use HoodieRealtimeInputFormat's split generation logic to do it for us, so we do not have to write native Presto code for generating and handling real time file splits (specific for Hudi).

Advantage:

  • Able to re-use the InputFormat integration with metadata listing without implementing anything native for Presto.
  • Decouples Hudi code from Presto code based, as we don't have to write more Hudi specific integration code within Presto Hive connector (although its not that big an advantage since Presto community has already accepted Hudi as a direct dependency and there is already Hudi specific code in Presto to use Hudi's PathFilter and InputFormat for splits generation and record reading in case of bootstrap and real time queries).
  • Builds on the existing solution we have for real time query support.

Disadvantage:

  • Metadata table will not be used in the most efficient way. As mentioned earlier getSplits is called for each partition by Presto, and every time on separate InputFormat instance. This would cause metadata table reader to be opened N times for N partitions with 1 seek for that partition. However, we will be looking to see if some kind of caching is possible to implement with InputFormat to help avoid this.

In Future we probably want to implement native real time splits and reading support within Presto for Hudi. If we move towards that, then the same DirectoryLister can be used to get file listing. However, native readers implementation for real time queries is out of scope for this feature.

Error Handling for Design V1 (pre 0.10.0):

Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly. Two types of errors need to be handled:

  1. Metadata Table updated (deltacommit completed) but dataset instant not completed 
    1. Example: Crash after Metadata Table update but before dataset Instant could be transitioned from inflight to completed
  2. Metadata Table update fails
    1. Example: Crash during Metadata Table update 
    2. Example: Crash during Metadata instant transition from inflight to completed
  3. Async updates to the dataset (async clean, async compaction)
  4. Multi-writer scenario causing async updates to the dataset (FUTURE)

Handling error case #1:

  1. Determine the latest instant on the dataset
  2. Read and merge the Metadata Table logs ONLY before the latest instant
    1. This in turn neglects the update performed on the metadata table but not yet visible on the dataset
  3. Next update on the dataset will rollback the failed change (orphan inflight instant)
  4. The rollback will trigger a rollback on the metadata table bringing it into sync

Handling error case #2:

  1. Since the Metadata Table update failed, the dataset itself will not have transitioned to complete on the action. Hence, the action on the dataset also failed.
  2. Next action on the dataset should rollback the failed change (orphan inflight instant)
  3. The rollback will trigger a rollback on the metadata table bringing it into sync

Handling error case #3 - Async Clean

  1. When an async clean is scheduled on the dataset, it generates a CleanPlan which is written to the clean.requested instant
  2. Metadata Table is updated before the creation of clean.requested instant
  3. Async clean will delete the files in the background and complete the clean operation
  4. Metadata Table does not need to be updated after/during the  async clean operation is in progress as the changes have been pre-applied. 

 Handling error case #3 - Async Compaction

  1. When an async compaction is scheduled on the dataset, it generates a CompactionPlan which is written to the compaction.requested instant. New base files are created at the end of the compaction process but they are not visible on the dataset view until the compaction process completes.
  2. Metadata Table is updated before the compaction process completes. The log writer already supports multi-writer so we do not have a problem with async / parallel updates to the log.
  3. Any active Metadata reader may not see the background compaction complete operation until next refresh. This should not have any data consistency issues as the whole File Slice design ensures that data is still visible through the older versions of the base / log files. 

Error Handling for Design V2 (0.10.0 onwards):

Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly. Two types of errors need to be handled:

  1. Metadata Table updated (deltacommit completed) but dataset instant not completed 
    1. Example: Crash after Metadata Table update but before dataset Instant could be transitioned from inflight to completed
  2. Metadata Table update fails
    1. Example: Crash during Metadata Table update 
    2. Example: Crash during Metadata instant transition from inflight to completed
  3. Metadata table compaction/cleaning fails mid-way.
  4. Async updates to the dataset (async clean, async compaction)

Handling error case #1:

  1. Readers should handle this via log record reader where in we filter only for completed commits from data table while reading metadata table. 
  2. For next write in data table, partial commit will be rolledback. This will result in a new delta commit in metadata table which will take care of ignoring files already synced. 

Handling error case #2:

  1. Since the Metadata Table update failed, the dataset itself will not have transitioned to complete on the action. Hence, the action on the dataset also failed.
  2. Next action on the dataset should rollback the failed change (orphan inflight instant)
  3. The rollback will trigger a delta commit on the metadata table which will apply the list of files to be deleted as part of rollback.

Handling compaction/cleaning failure in metadata table #3:

  1. If a compaction in metadata table fails, we need to ensure readers are still able to read the data from metadata table w/o any issues and it is re-attempted as well. So, we expose runAnyPendingCompaction() api in writeClient and before attempting to trigger a compaction, we will first try to reattempt any pending compactions from last round. So, until the time compaction complete, we merge the latest file slice with previous file slice and serve the readers. Writers to metadata can just polll the latest file slice since all it needs to do it append delta logs. Let's illustrate the scenario.
  2. There are 5 delta commits in data table. 1 more delta commit will trigger a compaction in metadata table. Current fileSlice is FS2 for eg. 
  3. writer tries to apply C6 and adds deltacommit6 to metadata table. this triggers a compaction in metadata table which will have 6 delta logs in the compaction plan. But the compaction fails mid way. Lets say file slice is FS3 for compaction.
  4. writer is restarted, which will try to rollback C6 since it failed. This is yet another delta commit in metadata table. But since compaction has been scheduled, this will go into FS3. after finishing the detla commit, we will try to complete any pending compactions. So, this will re-attempt the failed compaction from last round. 
  5. Once it succeeds, we will proceed on as usual. 

Handling error case #4 - Async Clean/Compaction in data table

  1. When an async clean is scheduled on the dataset, it generates a CleanPlan which is written to the clean.requested instant
  2. Async clean will delete the files in the background and complete the clean operation
  3. Just before committing to data table, the commit will get applied to metadata table and data should be intact. Readers will ensure to skip this if it failed before getting applied to data table. 

Query handling (read path)

From a reader/query engines perspective, Inputformat’s “listStatus” requests will be served with the current view of the file system without the expensive distributed file system queries.

Maintaining additional metadata in the manifest file would help Hudi support faster queries and optimize the dataset to handle complex queries.  During the planning phase of the query from hive/presto, through supported Hudi input formats complex queries can be optimized with the use of metadata available in master manifest and partition manifest files.

For example, with “SELECT * FROM table WHERE  x1 < C1 < y1 AND x2 < C2 < y2 …. ;”, when handling complex queries, 

  • Hudi will inspect the master manifest file to prune all partitions that don’t match the query range (for example, prune all partitions that don’t satisfy (x1 < C1 < y1)).   Apply the second and subsequent parts of the query to prune the partitions list further.
  • Once list of “candidate partitions” have been identified, Hudi will inspect the corresponding partition manifest files, to prune the data files that don’t match the query range to generate “candidate data files” where the data is most likely to reside. 
  • For most frequently used queries (learned over time for each dataset), Hudi could optimize further my maintaining a list of “hot columns” and maintaining column specific metadata (like range/ index files).

Rollout/Adoption Plan

  • What impact (if any) will there be on existing users?
    • None. This functionality will be add-on to existing datasets. If the metadata or indexes are not available, the functionality will not be enabled. 
  • If we are changing behavior how will we phase out the older behavior?
    • This new feature is backwards-compatible. If the metadata or indexes are not available, the existing way of listing files through the NameNode will be used.

  • If we need special migration tools, describe them here.
    • NA

  • When will we remove the existing behavior?
    • Is not required.

Test Plan

Extensive unit tests will test every aspect of the metadata implementation. 

How to test using HUDI CLI

HUDI-CLI has been updated to included the following metadata related command. These command require the HUD-CLI to be connected to the dataset (connect --path /path/to/hudi-dataset)

CommandDescriptionOutput
metadata set --metadataDir /some/writable/dir/Sets the directory to use for writing the metadata table (default .hoodie/metadata).
This is useful if you do not want  to write or don't have write permission to the dataset directory. 

metadata createCreates the metadata table

..... logs ...

Created Metadata Table in /some/writable/dir/ (duration=15.24sec)

metadata deleteDeletes the metadata table and all its files.
metadata statsShows stats for the metadata. Relevant only after the table has been created.Base path: /path/to/hudi-dataset
partitionCount: 493
lastCompactionTimestamp: none
isInSync: true
logFileCount: 1
totalBaseFileSizeInBytes: 0
totalLogFileSizeInBytes: 35404
baseFileCount: 0
metadata list-partitions

List all the partition. If the metadata table exists then it is used otherwise file listing is used.


metadata list-files --partition 2019/05/15List all the files in given partition. If the metadata table exists then it is used otherwise file listing is used.
metadata initUpdate the metadata table to reflect updates to the dataset since the creation.

metadata list-partitions  and metadata list-files  can be used for performance measurements. If these commands are issued before metadata table is created, they show the time taken using file listings. When issued after metadata table is created, they show the time for lookups from the metadata table.

Metadata Table Metrics

The following metrics have been added for the metadata 

MetricDescription
HoodieMetadata.initialize.durationTime to initialize metadata table (first time creation)
HoodieMetadata.initialize.countCount of initializations
HoodieMetadata.sync.durationTime to sync the metadata table
HoodieMetadata.sync.countCount of sync
HoodieMetadata.validate_errors.countNumber of errors while validating the metadata table contents
HoodieMetadata.validate_partitions.durationTime to get list of partitions using file listing (for purpose of validation)
HoodieMetadata.lookup_partitions.countNumber of times partition list was fetched from metadata table
HoodieMetadata.validate_files.durationTime to get list of files in partitions using file listing (for purpose of validation)
HoodieMetadata.lookup_files.countNumber of times list of files was fetched from metadata table
HoodieMetadata.scan.durationTime to read and merge all the LOG files for metadata table
HoodieMetadata.scan.countNumber of times the metadata LOG files were read

11 Comments

  1. Prashant Wason please start a mailing list DISCUSS thread when ready

  2. One high level comment is that : could we try to design this using the same base/log file concepts that we have for data? 

    indexing is being rethought in a similar fashion.. and I think we can implement a metastore using a similar approach as well..  


    (may be this is implementation detail, that we can work out down the line) 


  3. >> could we try to design this using the same base/log file concepts that we have for data? 

    The idea is very similar in Approach #1 though the file formats chosen for the base/log files will be optimized for key-value lookups rather than columnar lookup.

    Base Manifest (called Master Manifest) will have information till the last-creation-time and delta-manifest will have the updates at each Commit time. Periodically we will merge the delta into the Base. This can use the faimilar timestamp-as-fileid approach used to
    "tag" the HUDI Actions with a valid time and ID.


    >> indexing is being rethought in a similar fashion.

    Indexing is a different beast altogether with the following differences from this RFC:

    1. The very large number of UUIDs to be indexed which will lead to a very large index size. 
    2. The need for fast searches across this index which cannot be loaded in memory
    3. The need for updates without the option of re-writing the entire index.
  4. Prashant Wason I know you folks are blocked on this.. You will have a detailed review by monday..  

  5. Here's my take on the two approaches. Between the two, I do favor the approach 2's way of tracking the file level master manifest, without having two levels of hierarchy implicitly. 


    However, I do feel there are lots more to address there. 

    1. RFC, in general reasons about files and file names. This feels very COW centric.. Ideally we should design using `FileSlice`s in mind and how we manage metadata around file slices (i.e a base file and a set of log files), not just files..  It would be great if we can rewrite sections to make that clear. 
    2. I am not sure if we need to introduce another integer mapping for files? could we try designing around fileIds that we already have.  
    3. On the column ranges to file mappings, I feel we also need to think through interesting situations due to updates/deletes.e.g, if say for column c1 : (10, 20) → f1, f2 and we deleted a bunch of records in f2, now f2 may no longer hold data for the range but its not possible to decide if f1's range should shrink or not. Specifically, we may need to track/persist ranges per fileId and then derive the aggregated/collapsed ranges from that..
    4. On approach 2, I am concerned about the amount of individual column index files that need to be maintained.. 
    5. We do need to make key implementation details like bootstrapping the stats/manifests, pushing this under the FileSystemView abstractions, with atleast one concrete query engine integration fleshed out and prototyped before we can finalize I feel.  

    In general, I would love if we designed this in a log structured manner i.e updates to manifest are logged and can be compacted. (This is merely trying to formalize the notion of delta files you have in approach 2). 

    What if we had the following as three file groups 


    file_metadata : Base HFile, with key: data_fileID, value : latest file slice for the fileID. Whenever new file slice is added (compaction, merge handle) or new log files are added we log it to a log file, again with inline HFile (as we are exploring in RFC-08).

    file_stats_metadata: Base HFile with key :  <column_name>+<fileID> , value : min, max, nulls etc... Each time this changes (either via update/insert/delete), we log that and compactor will know to merge these into final stats. 

    column_stats_metadata : This is all the column indexes that you originally had as separate files , with base HFile key : <column_name> + <range> : (fileids) ... similarly we can log changes to this.. 


    My thinking is we can leverage the existing machinery (I and others can help provide these foundations for you to build on) we have for logging and compaction and design something incremental and scalable from the start. 





  6. More discussions between Nishith Agarwal Prashant Wason Balaji Varadarajan Vinoth ChandarBalajee Nagasubramaniam

    Agree on :

    • In any approach, we will try to implement mutations, deletes, cleaning , compaction using the same code/components as the actual data..

    Open items : 

    • If we have multiple file groups for manifest metadata, don't we have to list `.hoodie/metastore` ... 
      • A: we can have another level of indirection if needed..
    • If we have uuid fileGroupID in column indexes, compression won't be good.. bitwise filtering operations not possible
      • A: Maintain a integer to FileGroup.? But then, the column indexes need to read first to look up the `file_metadata` or master manifest. 
      • What if we made a fileNumber as key.. 
  7. sivabalan narayanan can you share a pointer to your hfile benchmarks here? 

  8. Open items 

    • FileID generation in the face of concurrent writers, they may try to allocate the same fileIDs. Can we benchmark the benefits from the performance benefits from fileID? 
    • In general we need to nail down how we handle async operations like cleaning/compaction.. and let them publish to the consolidated metadata 
    • Can we implement the delta manifest using the existing log format writer? 


    cc Nishith Agarwal Balaji Varadarajan Balajee Nagasubramaniam Prashant Wason

  9. For implementing phase 1 (just eliminating listing using partition index) 

    1. HoodieTable abstraction has pluggable base and log formats 
      1. helps rfc-08, rfc-02 (orc/avro)...rfc-15 phase 2 column indexes..
    2. New HoodieFileGroupIO  between IOHandles and ParquetWriters/ParquetReader
      1. HoodieFileGroupIO.withBaseFileFormat(parquet).withLogFileFormat(avro).withWriteRecordFormat(HoodieRecord.class)... 
      2. rfc-08/rc-15 - HoodieFileGroupIO.withBaseFileFormat(HFile).withLogFileFormat(HFile).withWriteRecordFormat(HoodieRecord.class)
      3. rfc-02 -  HoodieFileGroupIO.withBaseFileFormat(Orc).withLogFileFormat(Orc).withWriteRecordFormat(HoodieRecord.class)
      4. for input formats : HoodieFileGroupIO.....withReadRecordFormat(ArrayWritable.class) ...  
      5. for spark real time view : HoodieFileGroupIO.....withReadRecordFormat(Row.class) ..
    3. Multi Table Commits
      1. Goal : atomically commit to both data table and metadata table. 
      2. Implementation : Each HoodieTable has its own timeline..  notion of parent timeline.. i.e dataset level timeline.
      3. Three timelines on each dataset
        1. data timeline : current timeline in .hoodie ... 
        2. metadata timeline : added under .hoodie/metadata.. 
        3. dataset timeline : added under .hoodie/global-timeline
      4. Single commit on a dataset 
        1. Start a commit on dataset timeline (c1) (requestd state)
        2. Start a commit on data timeline at same instant time (c1) (requested state)
        3. Start a commit on metadata timeline at same instant time (c1)
        4. Perform data write
        5. Perform metadata write.
        6. Complete data commit
        7. Compelte metadata commit
        8. Complete global commit.. 
      5. Key change on query side : Additionally check a "parent" timeline for instant state
        1. C1 may be compelte on data timeline, but if c1 is not completed on parent/global timeline..
      6. Other benefits 
        1. Multi hudi dataset, rfc-08 will just add index hoodietable.
      7. Alternative : One single timeline shared across 2 tables (data and metadata).. 
        1. We have same number of files on timelines.. (1 less list call)
        2. Archival runs independently.. 
        3. Share .commit file will have writeStatuses for both data and metadata
    4. Multi writer concurrency 
      1. Compaction/Cleaning may change metadata in parallel with ingest
      2. Logging parallel y into same file group and merging based on instant time should work.. 
  10. Summarizing discussion among folks from Uber, AWS and OSS on how to implement record level & column indexes.

    1. Discussed what we do as part of RFC-15
      1. Identify work streams
      2. Merge these 2 records together
    2. Open ended things to figure out first
      1. [Uber] What design choices do we take ?
        1. RFC-15 initial proposal design choices
          1. Reading from the metadata tables is faster than 
          2. Map column_ranges to file ids (column-values to file ids)
          3. Large tables takes 20s and more for file to column-range mapping
          4. Index on multiple columns, how should the layout look like ?
            1. Possible options
              1. One partition per column
              2. Single column
                1. column_name + column_range
                2. Special key (indexed_columns)
                3. Dropping adding is easy if we just have it for separate columns but can we do it in a logical fashion for all columns within a partition
              3. Partition Level statistics
        2. Benchmarking for different parts of the code or solutions
        3. [Satish] Are we stuck to HFile or should we have a persisted B+Tree ?
          1. Custom RocksDB engine ?
        4. Should we track at row-group level or file level ?
      2. Infrastructure side
        1. Read the entire log block, can we do point lookups within a portion of the log
          1. Inline file system can help here
        2. [Uber/Vinoth] How do we write to different tables ?
          1. The way we sync is different (today we commit to writer and metadata separately that can get out of sync)
            1. Lag between metadata table and actual operation
          2. Predicate this effort on the multi-writer across data and metadata tables
          3. We merge this in-memory during writer client
            1. We don’t know range information in commit files as opposed to just files (RFC-15)
            2. Solutions
              1. Add the range to the metadata table
              2. Add range to clean, commit metadata files and then seek into these every time we create a view of the range for each column
          4. No infra to collect this information from the table
            1. For file listing is very easy
            2. For large datasets, having billion keys etc can take significantly longer time
          5. Single write to the metadata table
            1. Parallelize the metadata writer so we can use the power of executors
              1. Data and Metadata timeline out of sync
              2. Current design is bound by not being able to commit to the metadata table by actions such as clean, compaction
        3. Query engine 
          1. [Udit] Document for interfaces ? 
            1. Spark on Hive & Spark Datasource 
            2. Presto on Hive tables
            3. Hive QL ??
          2. How do the query engines really read data ?
      3. Given a where clause, minimum number of files to return to the query engines

    Udit Mehrotra Vinoth Chandar sivabalan narayanan Satish Kotha Prashant Wason 

  11. Vinoth Chandar Prashant Wason Updated the title of the RFC to reflect the actual work and details, PTAL