Proposers

Approvers

  • @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion threadhere

JIRA: here

Released: <Hudi Version>

Abstract

Hudi tables allow many operations to be performed on it, along with a very popular one, upsert(). To be able to support upserts, Hudi depends on an indexing scheme to route the incoming record to the correct file.
Currently, Hudi index implementation is pluggable and provides two options:

  • Bloom filter stored in each data file footer: this is the preferred default option.
  • Apache HBase: Efficient lookup for a small batch of keys. it requires an external system and consistent issue may occur.

When a Hudi table becomes large, the performance of bloom filter degrade drastically due to the increase in false positive probability . This proposal is to implement an efficient light-weight index approach to address the performance issue of bloom filter and also make Hudi table data clustered by record keys which dramatically improves query performance in some scenarios without additional overhead.

Background

Ingestion workflow of bloom filter

Hudi storage abstraction is composed of 2 main components: 1) the actual data, 2) and the index data. When upserting with the default configuration, Hudi

  1. Firstly, gets involved partitions spread from the input batch
  2. Secondly, loads the bloom filter index from all parquet files in these partitions
  3. Then, determines which file group every record belongs to.
    1. Check if the record key exits in bloom filter index data efficiently
    2. If true, as bloom filter has false positive problem, Hudi need load data file to check if it is really exits (large io and poor performance)
  4. Writes data to correspondent file groups.

When the Hudi table becomes larger (lots of data under partition directory), the probability of false positive increases, the more data files have to be read. In the worst case, Hudi has to read all data files to join with input batch which make near real-time processing impossible.

Bucketing table and hash index

Bucketing is a new way addressed to decompose table data sets into more manageable parts by clustering the records whose key has the same hash value under a unique hash function. Bucket in Hive is based on hashing function on the bucketed column(index key field), along with mod by the total number of buckets. Each bucket is stored in one file (for hive bucketing) and/or more files with similar name (for Spark bucketing).

Bucketed tables offer the efficient sampling. A primary reason to use bucketed tables is to support join between two large datasets using bucket join. For example, table1 and table2 are both bucketed by index key emp_id column into 8 and 4 buckets. If table1 joins table2 on the emp_id, bucket map join will work and no shuffle operation required. The pre-distribution of data can achieve a good amount of optimization in later query.

Implementation

Write Implementation

How hash index works

In order to obtain the mapping between files and buckets, filename is modified to `${bucketId}_${filegroupId}_${UUID}_${timestamp}.parquet/log`, like `00000002-0000-4035-a392-22a91eafd130_0-2-0_20210527160855.parquet`. In this way, it is no need to apply extra space to store the mapping.

For the mapping between bucket and hoodie record, the record finds its bucket with its index key's hash mod value, `Hash(recordKey) % N`, N is the bucket num of a Hudi table. If the table is partitioned, each partition is divided into N buckets and the record need to get the partition first, then find the bucket. 

There are two usage patterns for the relationship between bucket and file group:

  1. One file group per bucket: this is the simplest and most efficient way.

BucketId calculated by has mod value is assigned to fileGroupId, similar to other index methods. The only difference is the `tag location` proccess.

2. Multiple file groups per bucket: this is useful if data is skewed writing or grows a lot. 

(todo)

In order to obtain the mapping between files and buckets, filename is modified to `${bucketId}_${filegroupId}_${UUID}_${timestamp}.parquet/log`.

Comparsion


Pattern 1

Pattern 2

Number of file groups per bucket

1

>1

Can avoid random accessyesno

Implementation complexity

simple

complex

Can avoid data skew when writing

no

yes

Good support for data growth

bad

great

This proposal will implement pattern 1.

Ingestion workflow of hash index

When upserting with the hash index(pattern 1 mentioned above), Hudi

  1. Firstly, gets involved partitions spread from the input batch.
  2. Secondly, tags location to get all involved filegroups id by `hash(recordKey)%N`, N is bucket num.
  3. Then, shuffle the input by index key field.
  4. Writes a batch of clustering data to correspondent bucket/filegroup per task.

In the process, no data files are read.

Limitation and Solution

  • Scalability

Because the number of buckets is calculated according to the estimated amount of data, with the rapid growth of data, the size of a single bucket becomes too large, which will reduce the read and write performance.

Overwrite

A naive way is to reshuffle and rewrite the whole table when bucket num is changed. It is suitable for small table and doesn't care about how the bucket num changed.

Multiple Rehash

The number of buckets expanded by multiples is recommended. For multiple expansion, cluster the data by rehashing so that the existing data can be redistributed in a lightweight manner without shuffling.

For example, 2 buckets expanding to 4 will split the 1st bucket and rehashing data in it to two smaller buckets: 1st and 3st bucket, and the 2st bucket before is changed to 2st and 4st smaller one.

         Extendable Hash

(todo)

  • Data skew

Data skew means that the data size of some buckets will be too small or too large, resulting in a long tail of reads and writes, and increasing end-to-end time.
It is difficult to have a better solution on the compute engine side to solve this problem.

Configuration

hoodie.index.type=BUCKET_HASH_INDEX
hoodie.hash.index.bucket.num=1024
hoodie.datasource.write.indexkey.field=colA (index key should be the super set of the record key)

Read Optimization

Query Plan Optimization

For sql engines like Spark which generates a sql query plan to schedule the job when running sql, the data distribution of a sub query plan always fails to meet the requirements of its parent node in the executed plan, and Spark solves it by inserting an exchange node(shuffle) between the two. When read data from the bucketing hudi table, the input has unique data distribution which is clustered and ordered by index field. Therefore, Spark can use it instead of shuffling by implementing a new rule in optimizer to recognize the pattern. For example, t1 is a hudi table bucketed by colA,

  • sql1: select colA from t1 group by colA. The optimized sql plan is as follows,

  • sql2: select count(*) from t1 join t2 using (colA). The optimized sql plan is as follows,

(Details will be introduced in the future)

Performance Evaluation

Todo: Read and write performance comparison with bloom filter

Rollout/Adoption Plan

  • No impact on existing users because add a new index method
  • New configurations will be added to the documentation
  • HoodieKey serialization compatibility issues

With the hash index, index key has to be stored in `HoodieKey` to cluster the records and then writing to the corresponding file group. The Hudi default `FieldSerializer` doesn't consider the compatibility when adding or deleting fields in `HoodieKey`. It results in `HoodieDeleteBlock` that `HoodieKey` persisted in by using the old version serializer deserialization failures.
To solve the compatibility mentioned above, implement a new version of serializer which skips serializing the new added field which has `Optional` annotation, and increase the version of the `HoodieLogBlock`. The version of the log block corresponds to the serializer. The old version block uses the old serializer to deserialize and the new uses the new one.

Test Plan

Similar to bloom filter test

  • Unit tests
  • Integration testing
  • No labels