DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Discussion thread | https://lists.apache.org/thread/xx3bk8fb0sp81p8mttqjvrckn2p6c3mm |
|---|---|
| Vote thread | https://lists.apache.org/thread/ndoxr5m6f7dn64gc36boyrgg9wfdyscm |
| JIRA | https://issues.apache.org/jira/browse/FLINK-37021 |
| Release | <Flink Version> |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know the whole story.
Background and Motivation
FLIP-428 introduces fault-tolerance mechanisms for the disaggregated state storage system ForSt. It utilizes fast-duplication of remote file systems for transferring file between the working directory and the checkpoint directory. While this approach effectively minimizes local-to-remote transfer overheads, it still suffers from costs associated with remote-to-remote file copies, especially when fast-duplication support is absent. This leaves it falling short of the goal of fast fault tolerance in these cases.
In fact, because the working directory and checkpoint directory of ForSt typically reside within the same file system, there is no need to maintain multiple copies of the same file. Consequently, the previously mentioned overhead associated with file copying can be entirely eliminated by reusing the DB/Checkpoint files. The primary challenge lies in properly managing the files handled by Flink (checkpoint files) and ForStDB (runtime state SST files).
Hence, this FLIP proposes an improved approach for faster fault tolerance in ForSt. Specifically, during the Snapshot/Restore process:
- We reuse files as much as possible to avoid copying them between the DB path and the Checkpoint path.
- Once a file is reused (either during Snapshot or Restore), it is no longer owned by ForStDB, but is owned by Flink instead.
- Consequently, the file will not be deleted by ForSt but solely by Flink.
- Since Flink only deletes checkpoint files during checkpoint subsumptions, this means the deleted files are not required by subsequent checkpoints and are therefore no longer needed by ForStDB. Therefore, these files can be safely deleted by Flink.
Overview
The principle of File Reusing: Once a file is reused, it is owned by Flink, which means it cannot be deleted by ForStDB.
Requirements for File Reuse:
- For Checkpointing: ForSt's working directory and the checkpoint directory should reside in the same path within the same filesystem.
- For Restoration/Rescaling: In addition to the above requirement, the recovery should be in CLAIM mode. This means that the previous checkpoints are still managed by the JobManager of the new Flink job.
Below is an example of Checkpointing and Restoration.
- Restoration from CHK-1 in CLAIM mode (Figures 1~2). ForSt links the DB files to the checkpoint files. This is an in-memory operation without any file copying. The restored files are now owned by Flink.
- Running the Flink job (Figures 3~5). ForSt creates new files (owned by ForSt) and disposes some files (only those owned by ForSt are deleted).
- Generating a new Checkpoint CHK-2 (Figures 6~8). ForSt reuses files to generate state handles and registers them with Flink JM. No file copying occurs. The reused files are now owned by Flink. CHK-1 is subsumed and the orphan file is deleted by Flink.
Below is another example of rescaling from parallelism 2 to 1.
- Restoration from CHK-1 in CLAIM mode (Figures 1~2). Temporary DBs for each subtask reuse files from the checkpoint.
- Performing ClipDB (Figure 3). Each ForSt instance clears invalid data. Similar to the previous example, new files are created, and some files are disposed but not deleted by ForSt.
- Performing IngestDB (Figure 4). Data from multiple ForSt instances are merged and files are reused.
End-to-end User Case
To use ForSt state backend in a disaggregated compute-storage environment, users only need to specify the remote checkpoint path by default. ForSt automatically uses the shared folder under the checkpoint path as its working directory. Once the job starts, Flink subtasks will begin reading and writing state, and users can observe SST files being created or deleted in the remote path.
During job execution, Flink periodically triggers incremental checkpoints. In this process, ForSt reuses files from the DB to generate snapshots, meaning no SST files are copied when Flink generates checkpoints. However, if a native savepoint is triggered or full-checkpoints are enabled, the files will be copied from the database's working directory to the target savepoint/checkpoint folder.
If a Flink job triggers a failover during runtime or recovers from a checkpoint in CLAIM mode, no SST files will be copied, and ForSt will simply restart in the original working directory. However, if the job restarts in NO_CLAIM mode or with a different checkpoint path, the recovery process will copy the checkpoint files to the database's working directory.
Proposed Changes
All the following changes happen under the ForStStateBackend.
File Management
We introduce a FileManager for ForSt. It manages all DB files, including the newly created ones and those restored from CP, using a HashMap. For each entry in the map:
- Key: The absolute path of the file within the DB, denoted as
DBFilePath. This includes information like the DB Remote Path and file extension. This is a logical path and might not correspond to the actual storage path. - Value: The information of the physical file. Including the actual storage path and the file ownership status. Filenames are UUIDs to avoid naming conflicts among multiple subtasks.
Soft Linking: Two different DBFilePaths may point to the same physical file, essentially linking the two logical files.
Specifically, the information of each physical file contains two key attributes:
- FileSource: Indicates where the file can be accessed. It can store either an actual file address or a
StateHandle. The latter case occurs when the file was recovered from a checkpoint or has already been reused during a previous snapshot. - FileOwnership: Determines whether the file belongs to ForSt or Flink. There are three possible values:
- Owned by Flink: The file does not belong to ForSt. When disposing this file, ForSt will not actually delete it.
- Shareably Owned by ForSt: The file currently belongs to ForSt and can be deleted by ForSt, but may be transferred to Flink in the future.
- Privately Owned by ForSt: The file always belongs to ForSt. This attribute applies to files that shall never be reused, such as the options file, which ForSt always stores in the local file system.
Cleanup for Remote working files: The core logic for file cleanup remains unchanged. When TM exits, the ForSt-StateBackend deletes all files underneath the working directory. The only difference is that files owned by Flink will be skipped during deletion.
File Transfer Strategy
Conceptually, Flink needs to "transfer" files between its working directory and checkpoint directory. In the preceding discussion, we focused on scenarios where "file reuse" is used for file transfer. However, in some cases, we might have to resort to "copying files" for file transfer. Therefore, we introduce the FileTransferStrategy to determine how files should be transferred. It handles requests for "File Transfer" from the SnapshotStrategy and RestoreOperation of the ForSt StateBackend.
We generally choose the following methods for file transfer in order of priority from highest to lowest:
- Directly reuse the files: If the transfer 'source' is identical to the 'destination'.
- Fast duplicate the files (Paths Copy): If the transfer 'source' and 'destination' are in different directories within the same file system. It also requires support for Paths Copy in the remote FS.
- Through data transmission (Bytes Copy): If the transfer 'source' and 'destination' are in different storage systems, or the remote FS does not support Path Copy.
The following subsections describe details about how FileTransferStrategy works in different cases.
Snapshot
In this case, the transfer 'source' is the file in ForSt's working directory, while the 'destination' is the file in Checkpoint directory.
The DB files can be reused to Checkpoints if:
- This snapshot allows reuse of previous files, i.e.,
SharingFilesStrategy = FORWARD_BACKWARD. - ForSt's working directory is under the Checkpoint's shared directory.
- The file is not privately owned by ForSt.
For reusable files:
- Query the HashMap to find its
FileSource. Write thePathinto aStateHandle, or directly reuse theStateHandleand report it to JM. - Mark the file as
OWNED_BY_FLINK; henceforth, ForSt will not delete this file.
For non-reusable files:
- Copy the file to the corresponding Checkpoint path.
Restore
In this case, the transfer 'source' is the file in Checkpoint directory, while the 'destination' is the file in ForSt's working directory.
The Checkpoint files can be reused to DB if:
- This recovery is in
CLAIMmode. - The
StateHandleis not aByteStreamHandle. - The target file is not privately owned by ForSt.
For reusable files:
- Create a corresponding entry in the HashMap and save the
StateHandlein theFileSource. - Mark it as
OWNED_BY_FLINK.
For non-reusable files:
- Copy the file to the DB path.
- Create a corresponding entry in the HashMap, marking it as 'Shareably Owned by ForSt' or 'Privately Owned by ForSt'.
Rescale
The Rescale process is almost identical to the Restore process, except during the ClipDB operation where there may be deletion and creation of files:
- Deletion: Reused files are owned by Flink, so they will not actually be deleted by ForSt.
- Creation: Newly created files are owned by ForSt.
Relationship with Incremental Checkpoints
There is a related concept: Incremental checkpoints also "reuse files" through SnapshotStrategy. This is orthogonal to the "file reuse" proposed in this FLIP. To put it simply, SnapshotStrategy decides which files need to be transferred, while FileTransferStrategy then determines how to transfer these files.
Specifically, when ForSt takes a snapshot, it first invokes the SnapshotStrategy. In the case of incremental checkpoints, for any DB file that was previously registered with JM during a prior snapshot process, ForSt will directly use the existing StateHandle, rather than attempting to "transfer" the file. Otherwise, it will invoke the FileTransferStrategy to "transfer" the file.The strategy then determines whether the transfer should be done via file-copying or file-reusing.
Therefore, the improvements proposed by this FLIP do not affect the behavior of incremental checkpointing. Similarly, this conclusion also applies to full checkpoints and savepoints. For now, we only support reusing files for incremental checkpoints.
Related Features Support
- Concurrent Checkpoint: As ForSt's
SnapshotStrategyhandles concurrent checkpoints similarly to RocksDB, the newFileTransferStrategyincurs no affection. - As discussed in FLIP-428, Unaligned checkpoint stays unaffected, while Changelog and Local recovery is currently not supported in disaggregated ForSt.
- FLIP-306/Small File Merging Mechanism:
- Snapshot: File merging in a checkpoint is achieved by using
CheckpointStateOutputStreamto write multiple files into a single physical file. This means that files are transferred via “bytes copy”, which is contradictory to "file reuse". Therefore, file reuse and file merging mechanisms cannot be enabled simultaneously. Given that too many small files may still pose potential risks, it might be necessary to introduce a file merging mechanism at the DB level in the future to eliminate this issue. - Restore: When recovering from a checkpoint, however, both file merging and file reuse can take effect simultaneously. We can directly reuse the corresponding
SegmentFileStateHandleand mark the associated physical file as owned by Flink.
Compatibility, Deprecation, and Migration Plan
The aforementioned changes are implemented on top of ForSt's existing fault-tolerance mechanisms. The optimized file transfer mechanism will take effect automatically without requiring additional user configuration. No modification on existing interfaces and no deprecation should be applied.
Test Plan
Tests contains:
- New UTs for the FileManager of ForSt
- New UTs for the FileTransferStrategy
Rejected Alternatives
None for now.




