BackGound

Currently Hudi uses the InMemoryFileIndex combined with HoodieROTablePathFilter to list files when querying a cow table or a read-optimized mor table. Here is the mainly flow:

step1: Get all the rootPaths for the path pattern (e.g. /tmp/h0/*/*)

step2: List all the leaf files for the rootPaths. If the rootPaths.size > PARALLEL_PARTITION_DISCOVERY_THRESHOLD (default value is: 32), InMemoryIndex will use spark job to do the listLeafFiles concurrently. There exists a task for each rootPath.

After finishing the listFiles for each rootPath, it will use the HoodieROTablePathFilter to filter the useless file.

Mainly Problem

1、 Not friendly for query hoodie with stars


Users need to specify the path pattern to query the hudi table(Specify some stars to the path). They must know the correct partition path level before the query. It is not friendly to users. Other datasource like deltalake、parquet, Users just need to specify the base path for the query.

And also If we store the path with stars to the hive meta store, there exists some problems for some spark sql command. E.g. the Refresh Command will crash if the path contains "*".

2、Missing partition prune. 

Currently we cannot do the partition prune for hoodie datasource with the HoodieROTablePathFilter. Spark has not pushed the filter condition to the PathFilter.

Solution

FileIndex is an interface provided by spark to list files from the data source table. 

Implementing a FileIndex for Hoodie can solve the above problems. 

class HoodieFileIndex extends FileIndex {
  
  // list files from hoodie table, we can do the partition prune using
  // the partitionFilters.
  override def listFiles(partitionFilters: Seq[Expression],
                         dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
  }
  
  override def refresh(): Unit = {
  }
  
  override def inputFiles: Array[String] = {
  }
}


1、Friendly with query hoodie table

We can load the files needed for the query by the base path and partition filters. The “*” is not necessary to specify in the path.

2、Support Partition Prune for Spark

We can support Partition Prune for hoodie tables by the partitionFilters passed by FileIndex.

3、Push down other data filters

We can also push down other data filters to the hoodie datasource. e.g. we can push down the hoodie_commit_time to query the old version data.

And If the table has do the data clustering optimize, we can push down the sorted columns to skip files.





  • No labels

2 Comments

  1. This is definitely a good addition to hudi (smile) thanks for the proposal ! 

  2. Thanks for your attention sivabalan narayanan. I have submit a PR for this PR-2651. Can you take a look? thanks~