• @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
  • ...


Current state

Current State







Discussion thread: NA

JIRA HUDI-2450 - Getting issue details... STATUS

Released: <Hudi Version>


Currently, Flink writer pipeline has a bucket assigner stage to index records in the state and a writer stage to actually flushing the data to the disk. RFC-24
For a very large dataflow, the checkpoint could consume a significant amount of time and might lead to checkpoint timeout if we meet a slow hdfs datanode. So we need a pipeline to write data in a real streaming fashion.


We will share the hash mechanism of RFC-29, to make Flink and Spark using the same hash mechanism.
This feature was designed for Flink Merge On Read table and upsert mode only.


Each partition should have the same number of buckets, so the partition+bucketID was a unique key for all the filegroups in one table. When bootstrapping, we can scan the existing filesystem to build a Map(partition+bucketID, FileID) mapping, and append the data into the existing filegroup.

Generate the bucketID with the pre-defined hash function and shuffle it by bucketID
In order to make sure all the data of one bucket are located in the same parallelism, we need to shuffle the records by bucketID, then one filegroup will only be touched by one writer.

Assign FileID based on bucketID on StreamWriteFunction
Because we already have the Map(partition+bucketID, FileID) mapping, so we know the fileID of the record. If it's new, then create a new file group. We can tagLocation() here.

Create HoodieAppendHandle for each FileID
For each checkpoint, we create a new set of HoodieAppendHandle to actually write the record. Each fileID will have one HoodieAppendHandle. The fileID and Handle mapping will be stored at HoodieFlinkClient.

Buffering record in HoodieAppendHandle
In the Flink StreamWriteFunction.processElement(T value) method, we can buffer the HoodieRecord in the HoodieAppendHandle.write(hoodieRecord).

Flush to disk with the existing memory monitoring
HoodieAppendHandle has a memory control mechanism to flush data to the disk, so we don't need to worry about OOM.

Close the handle to get the commit status when the checkpoint is triggered
Once the checkpoint is triggered, we call HoodieAppendHandle.close() to flush all the handles and collect all the WriteStatus.

Performance Evaluation

Compared with state index and upsert(List<HoodieRecord>), the checkpoint time period was dropped from minutes to less than one minute for >2 millions qps dataflow.

Rollout/Adoption Plan

No impact on the existing pipeline cause it's a new indexing mechanism.

Test Plan

Unit test
Integration test
User participation is welcomed.


  1. >A writer handle that supports RowData or InternalRow directly is more efficient.

    Raymond Xu  and Y Ethan Guoare also thinking along these lines, avoid the avro conversion. 

    My intial thoughts are to introduce a new API `MergeHook` to replace `RecordPayload` (which assumes avro)

    it looks something like below with methods for each engine's Pojo. 

    interface MergeHook<T> {
      Option<T> merge(Option<T> oldValue, T newValue) {
         // oldValue is empty for inserts
         // return empty for deletes, merged value for updates/deletes

    then we have `SparkMergeHook implements MergeHook<Row>` and `FlinkMergeHook implements MergeHook<RowData>`. We write these for all built in payloads. We will support record payloads to be backwards compatible and a MergeHook<GenericRecord> in case a user does not implement it for a given engine. 

    I think we should do this and own the schema pieces fully. A lot of complexity can be avoided with this

  2. Are Raymond Xu  and Y Ethan Guo planning some refactoring here ?