Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS


ABANDONED


COMPLETED

(tick)

INACTIVE


Discussion thread: here

JIRA: here

Released: 0.8.0

Current Pipeline

The current Flink writer pipeline is as follows:

It has some bottlenecks to adapter to high-throughput data set writing:

  1. The InstantGeneratorOperator is parallelism 1, which is a limit for high-throughput consumption; because all the split inputs drain to a single thread, the network IO would gains pressure too
  2. The WriteProcessOperator handles inputs by partition, that means, within each partition write process, the BUCKETs are written one by one, the FILE IO is limit to adapter to high-throughput inputs
  3. Currently we buffer the data by checkpoints, which is too hard to be robust for production, the checkpoint function is blocking and should not have IO operations. 
  4. The FlinkHoodieIndex is only valid for a per-job scope, it does not work for existing bootstrap data or for different Flink jobs

The Improvement Proposal

# STEP1: Remove the Single Parallelism Operator

To solve bottleneck 1.

Firstly, we can avoid the singleton task operator InstantGeneratorOperator by implementing an operator coordinator for the write operator, the coordinator always starts the checkpoint first, it starts a new commit on a new checkpointing

Work Flow

The write function would firstly buffer the data as a batch of HoodieRecords,

It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully, the function notifies its operator coordinator StreamWriteOperatorCoordinator to mark a successful write.

Exactly-once Semantics

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always start before its operator, so when this function starts a checkpoint, a REQUESTED HoodieInstant already exists.

The function process thread then blocks data buffering and the checkpoint thread starts flushing the existing data buffer. When the existing data buffer writes successfully, the process thread unblock and starts buffering again for the next round checkpoint.

Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics.

Fault Tolerance

The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back the written data and throws when any error occurs. This means any checkpoint or task failure would trigger failover. The operator coordinator would try several times when committing the writestatus.

Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks writing to the same file group that conflicts. The general case for partition path is a datetime field, so the sink task is very possible to have an IO bottleneck, the more flexible solution is to shuffle the data by the file group IDs(would be fixed by #step2).

# STEP2: Make the Write Task Scalable

To solve bottleneck 2.

For each partition, the write client of WriteProcessOperator handles all the logic for indexing/bucketing/data-write:

  1. indexing the records for INSERT/UPDATE
  2. use the PARTITIONER for bucketing in order to decide each record’s bucket(fileId)
  3. write the buckets one by one

The #step3 are processed by a single read, it is impossible for consuming big dataset horizontally.

In order to solve bottleneck 2, we propose to split the StreamWriteOperator into 2 operators: the BucketAssigner & BucketWriter

The BucketAssigner

FileIdAssigner does two things for a record:

  1. There is a BucketAssigner that build a checkpoint partition write profile per-record, it is the core role to assign a bucket ID (partition path + fileID)
  2. Lookup the index to see if the record is an UPDATE, if the record is UPDATE, patch up the existing fileID with it; if the record is INSERT, assign with a new (or assigned) fileID with it based on the configured bucket size.
  3. Send the record which is patched up with fileId s.
  4. The bucket assigning happens per-record and we can assign the buckets in parallelism only if there is no conflict: two task writes to one bucket (some simple hash algorithm solves the problem)


The BucketAssigner output records then shuffle by the fileIds and sent to the BucketWriter.

The BucketWriter

BucketWriter takes a batch of HoodieRecord (with file IDs) and writes the assigned bucket one by one.

Generally, #step2 needs refactoring for the Flink client, especially the HoodieFlinkWriteClient, the current code base abstracts that the HoodieFlinkWriteClient handles all the tasks within #step2, which is suitable for Spark but not for Flink, we should abstract the indexing/bucketing work out of the client to make it more light-weight, it only writes the data based on the records bucket IDs.

# STEP3: Write as Mini-batch

To solve bottleneck 3.

  • We start a new instant when Coordinator starts(instead of start on new checkpoint from #step1 and #step2)
  • The BucketWriter blocks and flush the pending buffer data when a new checkpoint starts, it has an asynchronous thread to consume the buffer(for the first version, we flush the data out just in the #processElement) and write batch based on buffer data size;
  • In Coordinator, if data within one checkpoint write success (got a checkpoint success notification), check and commit the inflight Instant and start a new instant

That means, if a checkpoint succeeds but we do not receive the success event, the inflight instant will span two/(or more) checkpoints.

Work Flow

The function firstly buffers the data as a batch of {@link HoodieRecord}s, It flushes(write) the records bucket when the bucket size exceeds the configured threshold {@link FlinkOptions#WRITE_BUCKET_SIZE}
or the whole data buffer size exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}
or a Flink checkpoint starts. After a batch has been written successfully,
the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.

The Semantics

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.

In order to improve the throughput, The function process thread does not block data buffering
after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint
batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records
(e.g. the eager write batch), the semantics is still correct using the UPSERT operation.

Fault Tolerance

The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully.
The operator rolls back the written data and throws to trigger a failover when any error occurs.
This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped).
If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last
step at the #flushBuffer method).

The operator coordinator would try several times when committing the write status.

Note: The function task requires the input stream be shuffled by the file IDs.

To solve bottleneck 4.

The new Index is based on the BloomFilter index, a state is used as a cache of the underneath file based index:

  • check the state to find whether a record is an UPDATE, if it is true, do nothing
  • if the record is an INSERT, use the BloomFilter index to find the candidate files, look up these files and put all the index into the state

when all the files are checked, mark the index in pure state mode, we can then only check the state for the records that came in later.

The new index does not need to bootstrap for existing/history dataset, it is also useful for different Flink Job writes.

The Compatibility

The operator coordinator is introduced since Flink 1..11 release, in order to be compatible with Flink version lower than 1.11, we need to add a pipeline that does not use the operator coordinator:

input operator => the instance generator => fileID assigner => bucket writer => commit sink.

That is to replace the coordinator with instance generator and commit sink just like the original pipeline.


Note that this pipeline can not work in mini-batch mode because there is no component to coordinate the mini-batches, and we can not control the sequence of the checkpoint success notifications of the operators. So it is impossible to write eagerly  and start instant immediately when a checkpoint finish.

Implementation Plan

  1. Implements the current code base on #step1 and add a test framework to the hoodie-fink module, including the UTs and ITs
  2. Refactoring the HoodieFlinkWriteClient to abstract out the indexing/bucketing work, in hoodie-spark, they are all sub-pipelines, but in Flink, we should abstract it as Interface/Operator
  3. Implements the code to #step2
  4. Implements the code to #step3
  5. Add a new index
  6. Add compatible pipeline


Each step would have a separate JIRA issue.

  • No labels

3 Comments

  1. Danny Chen vinoyang The problems outlined and fixes here, make sense to me. I am not a Flink expert by any means, but please let me know if you need any inputs from me or accomodate any other larger changes. 


    Overall I had a question on Table API vs DataStream API. It seems like its a choice like DataFrame vs RDD. Do we plan to limit this to DataStream API and the deltastreamer like utility for now?

    1. Generally speaking, there are two level abstraction program interfaces provided by Flink(DataStream/Table&SQL). We try to provide both of them for Hudi users. Currently, the main work is for DataStream users. And the form in hudi is very similar to deltastreamer(for Spark users).

    2. Sounds good. We need to abstract this a little better over time. but once you all are happy with how the flink integration is, we can make a second pass over the Table APIs as well. Current approach sounds good.