Proposers

Approvers

  • TBD (This is a WIP RFC, Approvers to be added after it is ready to review)

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion thread: here


JIRA: here

Released: <Hudi Version>


Abstract

Query engines typically scan large amounts of irrelevant data for query planning and execution. Some workarounds are available to reduce amount of irrelevant data scanned. These include

  • Partition pruning 
  • File pruning
    • Some data file formats contain metadata including range information for certain columns (for parquet, this metadata is stored in footer).
    • As part of query planning, all range information from data files is read.
    • Irrelevant data files are then pruned based on predicates and available range information

Partition pruning typically puts the burden on users to select partitions where the data may exist. File pruning approach is expensive and does not scale if there are large number of partitions and data files to be scanned. So we propose a new solution to store additional information as part of Hudi metadata table to implement data skipping index. The goals of data skipping index is to provide:

  • Global index: Users query for information they need without need for specifying partitions. Index can effectively find data files in the table.
  • Improve query plan: Efficiently find data files that have information for specified query predicates. 
  • Support multiple types of index: Initial implementation may provide range index. But goal is provide flexible framework to implement other types of index (e.g. bloom)

Background

<Introduce any much background context which is relevant or necessary to understand the feature and design choices.>

TBD

  • Notes: index is only effective it data is organized (using clustering for example). If every file contains data for commonly specified query predicate, index may not be effective


Implementation

At a high level there are 3 components to implement index support: Metadata generation, storage format, query engine integration. Next 3 subsections discuss this in detail.

Index metadata generation

We want to support multiple types of index (range, bloom etc). So it is important to generate different types of record for different columns. 

class IndexCreator<O> { 
  // Note that 'O' above refers to collection type. For example, for spark, this could be JavaRDD

  /** Generates metadata from data files written */
  def collectMetadata( O<WriteStatus> filesWritten ) : O<HoodieMetadataRecord> /* returns metadata record */
}


// for int column
class IntRangeMetadataRecord extends HoodieMetadataRecord {
	columnName1, partition, fileId, commitTime, min: Int, max: Int
}

// for string column
class StringRangeMetadataRecord extends HoodieMetadataRecord {
	columnName2, partition, fileId, commitTime, min: String, max: String
}
// other type of index for column3
class BloomFilterRecord extends HoodieMetadataRecord {
	columnName3, partition, fileId, commitTime, bloomFilter
}


class CombinedMetadataRecord(List<HoodieMetadataRecord> allColumnsMetadata) extends HoodieMetadataRecord


Example CombinedMetadataRecords generated:


partition

filePath

c1

c1_min

c1_max

c2

c2_min

c2_max

p1

f1-c1.parquet

city_id

20

30

commit_time

“a”

“g”

p1

f2-c1.parquet

city_id

25

100

commit_time

“b”

“g”

p2

f3-c1.parquet

city_id

40

60

commit_time

“i”

“w”

p3

f4-c1.parquet

city_id

300

400

commit_time

“x”

“z”

Few notes (TBD orgnanize this better):

  • We store full file path (not fileId). So if we create new data file (say f1-c2.parquet), then we add new row in metadata table for this file
  • Schema can get complex if there are lot of columns.
  • Schema is going to be different for different tables based on columns in the table


Index storage layout

We considered HFile and Parquet for storing metadata records generated above. See comparison below

HFile

HFile contains a multi-layered index. At a high level this looks like persisted B+ tree. Keys are required to be inserted in increasing order. Data is stored in blocks (typically 64KB blocks).

Layer of index:

  1. Each block has its own leaf index (min/max key of block etc)
  2. Last key of each block is put in intermediate index 
  3. Root index in trailer points to intermediate index

Because keys are expected to be inserted in increasing order, typically enough data is accumulated in memory and once a certain size is reached, data is sorted and inserted to HFile in order.

HFile Advantages:

  • Fits well with existing Hudi metadata table
  • Fast constant time key lookup (2ms)
  • Keeps data sorted by key

HFile Disadvantages:

  • Doesn’t have predicate pushdown/filtering logic
  • Expensive to lookup lot of keys from same file sequentially (5000keys * 2ms = 10sec)
  • Range scan also seems slower (~15seconds)
  • Lot of storage used because there is no columnar compression

Parquet

Parquet stores data in columnar format


Parquet Advantages:

  • Efficient predicate pushdown. Can store all columns metadata together. Only query desired columns efficiently
  • Parallel processing of splits natively supported
  • Can provide UDF support if needed.
    • This may be useful for geo queries commonly used
    • Example: Table has latitude/longitude columns. But we can query data in hexagon/quad-tree efficiently using data skipping index.  using ranges for latitude and longitude may not be as effective.
  • Better storage compression
  • We can try different layouts by sorting data on different parameters (partition/fileId/columnBeingIndexed etc)

Parquet Disadvantages:

  • Doesn’t work well with hudi metadata table (because metadata table base format is HFile. HUDI table cannot support different file formats for different partitions)
  • No fast single key lookup. So, may not be ideal for other types of index like UUID lookup?

Hfile vs Parquet Comparison for Range index

Input: 

One partition of popular production table.

  • Total #files: 1288 (metadata for only latest snapshot stored)
  • File size: 150-250MB. 
  • Total data size: ~250GB
  • # of Columns: 4156 
  • # of columns with metadata: 2078

Results:


Time to scan full file

Time to query 10 rows 

Time for query large range (5K rows)

Storage space

HFile

15 seconds

51 ms

17 seconds

100MB

Parquet

6.1 seconds

1.9 seconds

2.1 seconds

43MB 

Parquet-spark sql

7 seconds

440 ms

1.5 seconds

43MB

Index integrations with query engines

How to apply query predicates in Hudi?

Query predicates are normally constructed in a tree like structure so this will follow same pattern. The proposal is create a mapping utility from “Engine” query predicates to a HudiExpression. This way filtering logic is engine agnostic

For AND and OR operators we can translate to a tree node with left and right expressions. An example is shown below of what the structure would look


public class HudiExpressionParentNode implements HudiExpression {
   HudiExpression left;
   HudiExpression right;
   
   @override
   boolean evaluate() {
        left.evaluate() && right.evaluate()
   }
}


For LEAF nodes we can create expression which contains the operator and value we are comparing to determine whether the file group may have data relevant to this query. The common search expressions for the leaf nodes:

  1. Equal to - if value in search expression greater than or equal to lower bound and is less than or equal to upper bound in file’s column statistics then true, else false
  2. Less than - if value in search expression is greater than lower bound in file’s column statistics then true, else false
  3. Less than or equal to - if value in search expression is greater than or equal to lower bound in file’s column statistics then true, else false
  4. Greater than - if value in search expression is lower than upper bound in file’s column statistics then true, else false
  5. Greater than or equal to - if value in search expression is lower than or equal to upper bound in file’s column statistics then true, else false

True tells us that there is a possibility that the file contains data which matches the search expression and to include in result set. False tells us that there is no possibility this file contains any data which matches the search expression and to exclude from the results.

public class HudiExpressionLeafNode implements HudiExpression {
   
   Operator op; // (EQ, LT, LTEQ, GT, GTEQ)
   T literal; // (INT, DOUBLE, FLOAT value)
   String column;
   
   @override
   boolean evaluate()
}


This way we can call evaluate on the root HudiExpression tree and it will determine whether the entire expression is satisfied for the file group.

Hive

In order for us to implement predicate push down in Hive we need to have access to the query predicate. Query predicate is not passed to InputFormat by default. HiveStoragePredicateHandler interface needs to be implemented in order to provide query predicate to InputFormat and for this we need to create a custom HiveStorageHandler. Therefore we will be creating new storage handler HudiStorageHandler

public interface HiveStorageHandler extends Configurable {
  public Class<? extends InputFormat> getInputFormatClass();
  public Class<? extends OutputFormat> getOutputFormatClass();
  public Class<? extends SerDe> getSerDeClass();
  public HiveMetaHook getMetaHook();
  public void configureTableJobProperties(
    TableDesc tableDesc,
    Map<String, String> jobProperties);
}

Everything will remain same with input format, output format, and serde classes being used in existing Hudi tables registered in Hive (HoodieParquetInputFormat still being used).  HudiStorageHandler would implement HiveStorageHandler and HiveStoragePredicateHandler.


Hive adds the query predicate returned by the Storage Handler to the job configuration. This job configuration is then supplied to the Input Format. It can be fetched and deserialized using the following:

    String hiveFilter = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);

The SearchArgument contains an ExpressionTree and a list of PredicateLeaf. The ExpressionTree is a tree structure used to define the query predicate. If operator is defined as OR, AND, or NOT this indicates there are children expressions, normally LEAFs.

public class ExpressionTree {
public enum Operator {OR, AND, NOT, LEAF, CONSTANT}
private final Operator operator;
private final List<ExpressionTree> children;
private int leaf;

If operator in ExpressionTree is defined as LEAF it corresponds to a PredicateLeaf defined in the Search Argument. PredicateLeaf will contain information about the query predicate such as operator, column name, and literal which is being compared

        private final org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator operator;
private final Type type;
private String columnName;
private final Object literal;
private final List<Object> literalList;


We can use this information and the SearchArgument to generate our HudiExpression. Then in HoodieParquetInputFormat.listStatus() after fetching files from FileSystemView for the remaining file groups we can apply HudieExpression using column metadata.

Spark


Presto

Rollout/Adoption Plan

  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>


Appendix

The below code can be run in spark-shell  (or jupyter notebook) to quickly iterate on different formats (TODO integrate with hudi metadata table writer code)

Generating Parquet metadata table

generate range metadata in parquet format
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties

val tableName="test.metadata_test_parquet"
val props = new Properties()
props.put("hoodie.table.name", tableName)
val destPath="hdfs://tables/my_metadata_table_parquet"
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)

import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._


case class FileStats(val minVal: String, val maxVal: String)

case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)

val basePath = "hdfs://tables/my_table1"
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)

val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq


val allmetadata = files.flatMap(filePath => {
    val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala

    blocks.flatMap(b => b.getColumns().asScala.
               map(col => (col.getPath().toDotString(), 
                           FileStats(Objects.toString(col.getStatistics().minAsString()),
                                     Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
               groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
               mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
               map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})

val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext) 
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
    option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
    option("hoodie.datasource.write.precombine.field", "colName").
    option("hoodie.table.name", "test.metadata_test").
    option("hoodie.datasource.write.operation", "insert").
    option("hoodie.parquet.small.file.limit", "107374182400").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    mode("Append").
    save(destPath)


Querying Parquet metadata table

Querying parquet metadata
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)

val parquetBasePath = new Path("hdfs://tables/my_metadata_table1/default/d95d3fbe-fe66-48fc-8bdc-7db923496916-0_0-55-98_20210327202848.parquet")
val fullSchema = ParquetUtils.readAvroSchema(spark.sparkContext.hadoopConfiguration, parquetBasePath)
val schema = HoodieAvroUtils.generateProjectionSchema(fullSchema,
    java.util.Arrays.asList("partitionPath", "fileName", "colName", "minVal", "maxVal"))

val  keys = new scala.collection.mutable.ListBuffer[String]()

val (count, time) = profile {
    
    AvroReadSupport.setAvroReadSchema(spark.sparkContext.hadoopConfiguration, schema)
    AvroReadSupport.setRequestedProjection(spark.sparkContext.hadoopConfiguration, schema)
    val filterToUse = FilterCompat.get(
        FilterApi.or(
            FilterApi.or(
                  FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("BASE.city_id")),
                  FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.source.type"))
            ),
            FilterApi.or(
                FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.contactType")),
                FilterApi.eq(FilterApi.binaryColumn("colName"), Binary.fromString("ACTIONS.actions.array.destination.type"))
            )
                
        )
    )
    val parquetReader = AvroParquetReader.builder(parquetBasePath).
       withConf(spark.sparkContext.hadoopConfiguration).
       withFilter(filterToUse).
       build()

    var record1 : GenericRecord = parquetReader.read().asInstanceOf[GenericRecord]  
    var count: Long = 1
    while (record1 != null) {
        record1 = parquetReader.read().asInstanceOf[GenericRecord] 
        count+=1
    }
    count
}
"" + (time/1000) + " ms to READ column info from parquet " + count


Generating HFile metadata table


generate HFile metadata format
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.Properties

val tableName=""test.metadata_test_hfile"
val destPath="hdfs://tables/my_metadata_table_hfile"
val props = new Properties()
props.put("hoodie.table.name", tableName)
props.put("hoodie.table.base.file.format", "HFILE")
HoodieTableMetaClient.initTableAndGetMetaClient(spark.sparkContext.hadoopConfiguration, destPath, props)

import java.text.DecimalFormat
import org.apache.hudi.common.table.HoodieSnapshotMetadataClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import java.util.stream.Collectors
import java.util.Objects
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.apache.parquet.column.statistics.Statistics
import scala.collection.JavaConverters._
import spark.implicits._


val basePath = "hdfs://tables/my_table1"

case class FileStats(val minVal: String, val maxVal: String)

case class ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)
    
val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath)
val snapshotClient = new HoodieSnapshotMetadataClient(metaClient)

val partition="2021/02/03"
val files = snapshotClient.getLatestSnapshotFiles(partition).collect(Collectors.toList()).asScala.toSeq

val allmetadata = files.flatMap(filePath => {
    val blocks = ParquetFileReader.readFooter(spark.sparkContext.hadoopConfiguration, filePath).getBlocks().asScala

    blocks.flatMap(b => b.getColumns().asScala.
               map(col => (col.getPath().toDotString(), 
                           FileStats(Objects.toString(col.getStatistics().minAsString()),
                                     Objects.toString(col.getStatistics().maxAsString()))))).toSeq.
               groupBy(x => x._1).mapValues(v => v.map(vv => vv._2).toSeq).
               mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max)).toSeq.
               map(x => ColumnFileStats(partition, filePath.getName(), x._1, x._2.minVal, x._2.maxVal))
})

val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext) 
val rdd = spark.sparkContext.parallelize(allmetadata)
rdd.toDF().write.format("org.apache.hudi").
    option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName").
    option("hoodie.datasource.write.precombine.field", "colName").
    option("hoodie.table.name", tableName).
    option("hoodie.datasource.write.operation", "bulk_insert").
    option("hoodie.parquet.small.file.limit", "1073741824").
    option("hoodie.parquet.max.file.size", "1073741824").
    option("hoodie.bulkinsert.shuffle.parallelism", "1").
    option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").
    mode("Append").
    save(destPath)
})


Querying HFile Metadata

Note that HFile format key could be stored differently. This is a simple example to measure time taken to read 'N' keys. N could be different for different key formats. (For example, if we choose key = partitionPath and value = range_for_all_files_all_columns_in_the_partition, then we only have to read few keys compared to key = partition+file_name, value = range_for_all_columns_in_file) 

querying hfile range metadata
import System.nanoTime
def profile[R](code: => R, t: Long = nanoTime) = (code, (((nanoTime - t)/1000)).toInt)

import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hadoop.fs.Path
import scala.collection.JavaConverters._


val cacheConfig = new CacheConfig(spark.sparkContext.hadoopConfiguration);
cacheConfig.setCacheDataInL1(false);

val hfilePath = new Path("hdfs://tables/my_metadata_table_hfile/default/53dd4e23-012c-4e3e-91fc-9d5ff6a3bf83-0_0-48-91_20210327201656.hfile")
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig);

val keys = reader.readAllRecords().asScala.map(x => x.getFirst())
val keyRange = keys.slice(95000, 100000) // pick desired number of keys

var totalTime: Long = 0
var totalRecords: Long = 0
val reader = new HoodieHFileReader(spark.sparkContext.hadoopConfiguration, hfilePath, cacheConfig)
(keyRange).map { k =>
    val (record, time) = profile { reader.getRecordByKey(k) }
    totalTime += time
    if (record.isPresent()) { totalRecords += 1L}
}
"" + (totalTime/1000) + " ms to LOOKUP HFile, #records: " + totalRecords


Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>








18 Comments

  1. Satish Kotha Ryan Pifer Udit Mehrotra

    This is a great discussion, we now also plan to use clustering feature to collect the data statistics for file pruning. I am glad to join this great work .

    I have some thoughts, but it is not perfect now. Want to discuss with you.
    1. about index

    For big data  analytics scene which scan large files usually, use range index(min、max) for every columns will be efficient. Snowflake use min&max statistics after clustering, and storage it in a independent metaservice.   Now read large min&max in parquet footer performance is poor in cloud storage . If have a unify meta table to store the minor statistics will be better.

    But for hudi upsert scene , such as hudi recordkey index or add partition , use hfile will be better.

    2.  about format choice

    i think parquet will more better for big data  analytics scene. Metadata now use Hfile to support partition update. We can add a parquet metadata, also retain the hfile metadata for different scene.

    3. about data layout engine

    we can make clustering more intelligence to merge file and generate statistics. Because in cloud native infrastructure,  use serverless resource to clustering asynchronous is very suitable just like snowflake clustering.

    4. about metatable 

    I think metatable can solve more about the cloud storage disadvantaged. Now have support partitions update. If it can remove all the file list action in hudi ? such as rollback mark files  or others (i also do some summary). cc Udit Mehrotra Vinoth Chandar


  2. Just as discussion  in  HUDI-1138 - Getting issue details... STATUS . Now list marker files for rollback is expensive and slow in cloud storage such s3/ alibaba cloud oss.

    1. I have a idea, can we update the marker file  info to metatable in timeline server.We can unify the meta info to metatable 

    2. As discussion in RFC-27. I think if we can unify the metadata such as partitions, markfiles, statistics ,index or others. Just as delta lake use delta log store this , and snowflake use metaservice . The unify metatable can resolve cloud storage poor meta manage 、 compute and storage query performance . I think RFC-27. RFC - 15 . RFC-08 have some overlaps. Want to discuss with  you. Thanks. 

    Vinoth Chandar Satish Kotha Ryan Pifer Udit Mehrotra



  3. Today the metadata table has a single partition files, which contains the file listings.

    Once this work is done, we want to add partition col_ranges , which will contain ranges for each column in the files. 

    are you suggesting adding marker files to the metadata table as well? liwei


    1. Vinoth Chandar

      Yes, can we unify the concept to metadata?  Which resolve the performance issues. Such as:

      1、 resolve cloud storage list poor performance or others(partition list and mark file list)

      2、add index such as  zone maps(min&max etc)、bitmap、 record index.

      Then the query engine can improve the performance. 


    2. I am not sure if marker files belong there. Once a commit is over, marker files have no value. metadata table should just have ongoing maintenance of metadata like `point 2` . This RFC adds min, max, zones as you refer to.

      Hope tht makes sense 

  4. option("hoodie.datasource.write.recordkey.field", "partitionPath,fileName,colName")
    we need to sort by `colName` right?

    1. Even without sorting, parquet was doing better. I also tried sorting using bulk_insert 

          option("hoodie.datasource.write.operation", "bulk_insert").

          option("hoodie.datasource.write.recordkey.field", "colName,fileName,partitionPath") // default sort is global based on key

          option("hoodie.parquet.small.file.limit", "1073741824").
          option("hoodie.parquet.max.file.size", "1073741824").
          option("hoodie.bulkinsert.shuffle.parallelism", "1").

          

  5. Few things I thought of.

    1) I think this is touched upon in Prashant Wason's original design for RFC-15. After we query the `column_ranges` metadata table partition (as described in this RFC) for matching files for a given set of predicates, would n't we need to fetch the fileStatus related information from the `files` partition? We should account for that look up also when trying to benchmark/reason about performance?

    2) Some kind of dictionary encoding of file paths may be necessary to avoid re-encoding full file paths in the `column_ranges` partition as well? 

    3) Do we have everything we need from the `files` partition for query planning?  I can think of these additional fields : split information (number of splits, with begin, end offsets), total number of records, number of blocks, block size? 

    4) Are we including everything we need from the `column_ranges` partition for column level stats. e.g null_counts, size of each column in a file etc? 


    Might be good to close these out as well. cc Satish Kotha Udit Mehrotra Ryan Pifer Nishith Agarwal

  6. ath="hdfs://tables/my_metadata_table_parquet"
    Satish Kotha Can we  also evaluating in object storage such S3. Because object storage seek and file open is slower? cc Udit Mehrotra
    1. Yes, the scripts in appendix can be used to measure performance on cloud stores. I think Vinoth and few others are planning to run this over next week.

  7. I wrote a JMH benchmark (reusing parts of satish's code from the appendix) that encodes ranges for 65536 files (1TB data, stored as 16MB small files), across 100 columns, against s3. So total of 6,553,600 rows stored, sorted by column name. (I don't have as large a prod dataset as uber per se.)

    Benchmark                                                     (numColsToRead)  (numFiles)            (runId)  Mode  Cnt      Score      Error  Units
    MetadataBenchmark.fullReadAvroBackedMetadata                              N/A       65536  run-1621408015253  avgt    5  26328.478 ± 6352.333  ms/op
    MetadataBenchmark.fullReadHFileBackedMetadata                             N/A       65536  run-1621408015253  avgt    5  18242.803 ± 4149.712  ms/op
    MetadataBenchmark.fullReadParquetBackedMetadata                           N/A       65536  run-1621408015253  avgt    5  33942.539 ± 3977.936  ms/op
    MetadataBenchmark.readMatchingFilesHFileBackedMetadata                     10       65536  run-1621408015253  avgt    5   1791.368 ±  263.773  ms/op
    MetadataBenchmark.readMatchingFilesParquetBackedMetadata                   10       65536  run-1621408015253  avgt    5   5158.325 ± 1038.642  ms/op

    First three benchmarks compare the time taken to read a full avro, hfile, parquet encoded data. Full read of parquet should be slower than a row based file format, and it is - as expected. 

    (Note: I think avro should be similar to hfile, not sure why). The last two are reading out just 10 columns's ranges out of 100 total. and HFile outperforms parquet + filter pushdown (1MB row group size). Tried going as low as 256kb for row group size, parquet numbers became worse `6-7 secs`.   


    1. This seems to be different from results I observed. Couple questions:

      1) Are you using HoodieHfileReader? or something else to read HFile metadata? 

      2) Benchmark shows 'numColsToRead'. Are you reading data for entire column on main table?  Or is it only fetching parts of data for the column based on range being queried?

      3) Do you mind sharing HFile/Parquet schema. i.e., For Hfile: what is the key, and what do we store in value. For parquet: schema and Did we sort the data based on column and value_min/value_max?

    2. I wrote some code on top of HFile.Reader, reusing what HoodieHFileReader does. It's not that complicated actually. Have barely tuned anything. just kept the block size to 1MB. 

      yes. both parquet and hfile are sorted by column name and file name. Not sorted by min/max. (that's something that I would like to explore as well)

      Reading all columns. I am not actually sure about just few columns being needed. I think all of these are in fact useful for planning? right?

      this.schema = SchemaBuilder.record("range_metadata").fields()
                .name("column").type().stringType().noDefault()
                .name("file").type().stringType().noDefault()
                .name("min").type().stringType().noDefault()
                .name("max").type().stringType().noDefault()
                .name("size").type().longType().noDefault()
                .name("count").type().longType().noDefault()
                .name("nonNullCount").type().longType().noDefault()
                .endRecord();
    3. Vinoth Chandar Satish Kotha

      Hello ,

      1. About parquet performance: I see  MetadataBenchmark for  parquet using ParquetReader , it will read record by record. But in spark sql datasource use VectorizedParquetRecordReader which will read data by parquet column page. So I think use VectorizedParquetRecordReader to test preformance will be more fair for parquet which can efficient use of the features of column storage. I think for full 10 numColsToRead, Parquet will be better.
      2. About "sorted by column name":  I see the metatable schema is ColumnFileStats in the design , every column with statistics a record in metatable. 

      "ColumnFileStats(val partitionPath: String, val fileName: String, val colName: String, val minVal: String, val maxVal: String)"

            Why we use schema like this ?

      "FileStats(val partitionPath: String, val fileName: String, val statistics:struct)", statistics is a struct will contain all columns min, max info.

      A. About scene: In most analytics scene, the query will have multi columns  pushdown conditions. If we use ColumnFileStats sorted by column name, it will need multi  column statics query from metadata. But use FileStats one query can satisfy  all the columns  pushdown conditions. So i think ColumnFileStats with parquet will  be better.

      B. About metatable format: ColumnFileStats sorted by column name can  be  more suited to row format

      C. About Scalability: Use FileStats to add statistics with struct can support more column such as add column.

      delta lake and iceberg use FileStats mode:

      https://github.com/delta-io/delta/blob/master/PROTOCOL.md#per-file-statistics

      https://iceberg.apache.org/spec/#sorting

            3. statistics Index: Now we just collect min&max. If nullCount、value_counts or others is needed



  8. liwei 

    >>But in spark sql datasource use VectorizedParquetRecordReader which will read data by parquet column page

    Fair point. but I still feel parquet will be slower, given it cannot really index within a single page. Worth testing out. Are you able to enhance the benchmark?


    >>"FileStats(val partitionPath: String, val fileName: String, val statistics:struct)", statistics is a struct will contain all columns min, max info.

    Few things become simpler, efficient by flattening like in `ColumnFileStats` for each column, (as opposed to using a struct). We ran a survey on the community where users mention that typically they have 50-100 columns and only a few are used in a query. In the `ColumnFileStats` model, we only read values for the columns actually present in query. In the `FileStats`/struct approach, we also need to manage the schema of the struct and keep it in line with data evolution. 

    We are consciously trying to just avoiding just keeping file level statistics for N files, and make the lookup O(N) 

    Hope that makes sense. 


    Latest update here is : Satish Kothacan since now verified the numbers I posted above as well. I am still trying to explore other structures that can further optimize this. Our goal should < 1 sec of query planning time. 


    1. Vinoth Chandar

      >>But in spark sql datasource use VectorizedParquetRecordReader which will read data by parquet column page

         >> Fair point. but I still feel parquet will be slower, given it cannot really index within a single page. Worth testing out. Are you able to enhance the benchmark?

      Hello , parquet 1.11.1 version have support "ColumnIndex Layout to Support Page Skipping"  which make both range scans and point lookups I/O efficient. And spark 3.0 have support it, and performance have   large increase  


      >>"FileStats(val partitionPath: String, val fileName: String, val statistics:struct)", statistics is a struct will contain all columns min, max info.

        >> Few things become simpler, efficient by flattening like in `ColumnFileStats` for each column, (as opposed to using a struct). We ran a survey on the community where users mention that typically they have 50-100 columns and only a few are used in a query. In the `ColumnFileStats` model, we only read values for the columns actually present in query. In the `FileStats`/struct approach, we also need to manage the schema of the struct and keep it in line with data evolution. 

        >> We are consciously trying to just avoiding just keeping file level statistics for N files, and make the lookup O(N) 

        >> Hope that makes sense. 

      use Parquet nested type , spark read parquet support pushdown for nested type. If use hfile will not have the pushdown optimize. If we have a metatable service such as hbase will better for row format.


      1. liwei Even with parquet 1.11.1 version, benchmark shows similar performance. HFile is doing better than Parquet for this specific benchmark. I tried tuning some of the parquet settings, but I couldn't get any substantial gains. Do you mind trying out the benchmark or give any suggestions on parameters to tune?

        Note that this specific benchmark test is limited to matching 'columnName'. We are not doing predicate push down on column range. Also, this is not large scale so everything fits into 1 file. Maybe when we have more files and if we can push down column ranges, parquet will do better.


        Either way, this has been in discussion for some time. As next steps, I'm planning to do a proof-of-concept on this. I'm thinking of writing pluggable data file format so we can switch backend storage format easily in future. Let me know what you think.


        1. Satish Kotha With predicate push down and large files parquet will be better.

          pluggable data file format  will be so better. The metatable can store in different format or service , such as snowflake have a metaservice to store the index statistics.