Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Current state: Under Discussion


Page properties



1. Motivation

...

As Flink stores shared and private states in different paths, file merging is performed separately for each scope. The table below lists the behavior of shared and private states in different aspects. For further information, please consult the following sections.


Granularity of merging

Support merging across checkpoints

Need file copy after failover

Need file copy after restoring from retained checkpoint (a new job)

Shared state

per subtask

yes

no

yes

Private state

per taskmanager

yes

no

no

4.1.1 Merge files within a checkpoint or across checkpoints

...

We have implemented a POC (in branch https://github.com/Zakelly/flink/tree/flip306_poc) which only supports merging state in EXCLUSIVE scope (eg. operator state and channel state), and test it on a simple job of word count with a tumbling window in 4 parallelism running in one single TM. We collect and compare the file creation and deletion according to our log, with and without file merging enabled. Only the files in the EXCLUSIVE scope counts. The merging target size is set as 32MB and the job is running for 4 hours before. Here's the result:


No merging

Merge within each checkpoint

Merge across checkpoints

Aligned checkpoint

create

5756

3295

1920

delete

5752

3292

1916

Unaligned checkpoint

create

10222

6081

1940

delete

10214

6077

1932

As shown above, this sample job demonstrates that by merging only the operator state and channel state, over 40% of file creation and deletion operations can be eliminated within each checkpoint. Moreover, when merging across multiple checkpoints, this percentage increases to 88%. Additionally, the number of small files is higher in unaligned checkpoints compared to aligned checkpoints, so our solution performs better when unaligned checkpoints are enabled.

...