Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: re-upload all pictures

...

This FLIP proposes a unified file merging framework to solve the file flood problem during flink checkpoint, which could be applied to all types of checkpoint files, including keyed state, non-keyed state, channel state and changelog state. The primary objective is to minimize file open and close operations to reduce access and modification of metadata in the DFS. The proposed approach enables TM to write several data segments within an opened file, with the written files owned by TM. Ownership of a single checkpoint file is transferred to TM, while JM manages the parent directory of these files.TM leverages checkpoint notification (aborted, subsumed) from JM to manage (delete) specific files. The implementation is straightforward and does not require modifications on the JM side. It introduces a new code path in TM, which can be safely enabled and disabled as needed.


Image RemovedImage Added

Fig.1 Overview of file ownership

...

  • SEGMENTED_WITHIN_CP_BOUNDARY: Only merge checkpoint files from the same checkpoint.
  • SEGMENTED_ACROSS_CP_BOUNDARY: Able to merge checkpoint files across different checkpoints. This option is only supported in some file systems.

Image RemovedImage Added

Fig.2 File visibility when checkpoint finished and two merging modes in time dimension

...

When merging files, there are two levels of granularity to consider: subtask or TM. Generally, merging at the TM level is more effective since there are typically more files to merge. However, when restarting a job, the limitations of merging at the TM level become apparent. During restoration, checkpoint files may still be managed by the new job (see 'Claiming the snapshot' in FLIP-193). Given that TMs are reassigned after restoration, it is difficult to manage physical files that contain data from multiple subtasks scattered across different TMs (as depicted in Fig.3). There is no synchronization mechanism between TMs, making file management in this scenario challenging. One possibility is to make multiple copies of the file and have each TM manage a unique copy, but this would result in significant data transfer over the network.


Image RemovedImage Added

Fig.3 The shared state file management problem when merging files per TM level after job restoring.

...

Restarting a job with a new configuration is a common operation for Flink users. As previously discussed, for shared states in CLAIM recovery mode, the TMs of the new job must take responsibility for managing the old checkpoint files. As files are merged within each subtask for shared states, a physical file belongs to one subtask. If the user does not change the job's parallelism, the old and new subtasks correspond one-to-one, making it a simple take-over scenario. In this case, the new subtask only manages files from the corresponding old subtask. If the user does change the parallelism, an old physical file will be retained by several new subtasks. To avoid communication between subtasks in file management, multiple copies of old files are created, with each new subtask managing a unique copy. The original files are deleted by the JM when the old checkpoint is subsumed, since no TM (or subtask) will claim ownership of these files. Fig.4 and 5 show the checkpoint file management after restoring the job without and with a change in parallelism, respectively.


Image RemovedImage Added

Fig.4 Shared state file management after restoring with same parallelism

Image RemovedImage Added

Fig.5 Shared state file management after restoring with a different parallelism

...

In section 4.2, it was noted that sometimes the JM deletes old checkpoint files entirely when a new checkpoint is finished after restoring. To simplify file deletion and file ownership transfer, this FLIP proposes a new physical file layout for checkpoint storage. The directories for exclusive scope, shared scope, and task-owned states remain the same. For shared states, a sub-directory is created in the shared directory for each subtask. And for private states, a taskmanager-owned directory is introduced under the checkpoint storage, and a sub-directory is created in the taskmanager-owned directory for each TM. These sub-directories are not under the chk-x directory because the chk-x directories do not survive across checkpoints, but physical files may contain states from different checkpoints. The difference between shared and private states is due to the different file merging granularities, as discussed in Section 4.1.2. Fig.6 illustrates the newly proposed checkpoint storage file layout.


Image RemovedImage Added
Fig.6 Proposed checkpoint storage layout.

...

In the case of shared states, when a job failover occurs, the subtask-owned sub-directory remains the same. The JM receives the same DirectoryStateHandles and does not delete any directories or files. However, when a user restarts a job manually, the sub-directory for each subtask changes due to a change in job parallelism or checkpoint storage base path. The JM receives different DirectoryStateHandles and deletes the original directories later. Fig.7 and 8 describe these two cases.


Image RemovedImage Added

Fig.7 Subtask-owned folder consistent across job restarts (failover).

...


Image Added

Fig.8 Subtask-owned folder migration after restoring from a retained checkpoint.

...

TM records the reference between checkpoint data and underlying files. On checkpoint subsumption or abortion, TM will delete the no-longer-needed files. The overall process is as Fig.9 shows:Image Removed

Image Added


Fig.9 Overview of the file management

...

To determine which checkpoint data is no longer needed, TM uses a watermark of subsumed checkpoint ID. This approach effectively avoids any issues resulting from the loss of checkpoint notifications. When a checkpoint is removed, the related data segments or state handles are also deleted, and their references to underlying files are released. As a result, TM can safely delete the files that are no longer being used.Image Removed


Image Added

Fig.10 Reference counting between logical and physical files.

...