Proposers
Approvers
- @<approver1 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
- @<approver2 JIRA username> : [APPROVED/REQUESTED_INFO/REJECTED]
- ...
Status
Current state:
Current State | |
---|---|
UNDER DISCUSSION | |
IN PROGRESS | |
ABANDONED | |
COMPLETED | |
INACTIVE |
Discussion thread: NA
JIRA: - HUDI-2450Getting issue details... STATUS
Released: <Hudi Version>
Abstract
Currently, Flink writer pipeline has a bucket assigner stage to index records in the state and a writer stage to actually flushing the data to the disk. RFC-24
For a very large dataflow, the checkpoint could consume a significant amount of time and might lead to checkpoint timeout if we meet a slow hdfs datanode. So we need a pipeline to write data in a real streaming fashion.
Background
We will share the hash mechanism of RFC-29, to make Flink and Spark using the same hash mechanism.
This feature was designed for Flink Merge On Read table and upsert mode only.
Implementation
Bootstrap
Each partition should have the same number of buckets, so the partition+bucketID was a unique key for all the filegroups in one table. When bootstrapping, we can scan the existing filesystem to build a Map(partition+bucketID, FileID) mapping, and append the data into the existing filegroup.
Generate the bucketID with the pre-defined hash function and shuffle it by bucketID
In order to make sure all the data of one bucket are located in the same parallelism, we need to shuffle the records by bucketID, then one filegroup will only be touched by one writer.
Assign FileID based on bucketID on StreamWriteFunction
Because we already have the Map(partition+bucketID, FileID) mapping, so we know the fileID of the record. If it's new, then create a new file group. We can tagLocation() here.
Create HoodieAppendHandle for each FileID
For each checkpoint, we create a new set of HoodieAppendHandle to actually write the record. Each fileID will have one HoodieAppendHandle. The fileID and Handle mapping will be stored at HoodieFlinkClient.
Buffering record in HoodieAppendHandle
In the Flink StreamWriteFunction.processElement(T value) method, we can buffer the HoodieRecord in the HoodieAppendHandle.write(hoodieRecord).
Flush to disk with the existing memory monitoring
HoodieAppendHandle has a memory control mechanism to flush data to the disk, so we don't need to worry about OOM.
Close the handle to get the commit status when the checkpoint is triggered
Once the checkpoint is triggered, we call HoodieAppendHandle.close() to flush all the handles and collect all the WriteStatus.
Performance Evaluation
Compared with state index and upsert(List<HoodieRecord>), the checkpoint time period was dropped from minutes to less than one minute for >2 millions qps dataflow.
Rollout/Adoption Plan
No impact on the existing pipeline cause it's a new indexing mechanism.
Test Plan
Unit test
Integration test
User participation is welcomed.
2 Comments
Vinoth Chandar
>A writer handle that supports RowData or InternalRow directly is more efficient.
Raymond Xu and Y Ethan Guoare also thinking along these lines, avoid the avro conversion.
My intial thoughts are to introduce a new API `MergeHook` to replace `RecordPayload` (which assumes avro)
it looks something like below with methods for each engine's Pojo.
then we have `SparkMergeHook implements MergeHook<Row>` and `FlinkMergeHook implements MergeHook<RowData>`. We write these for all built in payloads. We will support record payloads to be backwards compatible and a MergeHook<GenericRecord> in case a user does not implement it for a given engine.
I think we should do this and own the schema pieces fully. A lot of complexity can be avoided with this
Danny Chen
Are Raymond Xu and Y Ethan Guo planning some refactoring here ?