Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know the whole story.

Background and Motivation

As outlined in FLIP-423 [1] and FLIP-427 [2], we proposed to disaggregate StateManagement and introduced a disaggregated state storage named ForSt, which evolves from RocksDB. Within the new framework, where the primary storage is placed on the remote file system, several challenges emerge when attempting to reuse the existing fault-tolerance mechanisms of local RocksDB:

  • Because most remote file system don't support hard-link, ForSt can't utilize hard-link to capture a consistent snapshot during checkpoint synchronous phase as rocksdb currently does.
  • The existing file transfer mechanism within RocksDB is inefficient during checkpoints; it involves first downloading the remote working state data to local memory and then uploading it to the checkpoint directory. Likewise, both restore and rescale face the similar problems due to superfluous data transmission.

In order to solve the above problems and improve checkpoint/restore/rescaling performance of disaggregated storage, this FLIP proposes:

  1. A new checkpoint strategy for disaggregated state storage: leverage RocksDB's low-level api to retain a consistent snapshot during the checkpoint synchronous phase; and then transfer the snapshot files to checkpoint directory during asynchronous phase;
  2. Accelerating checkpoint/restore/rescaling by leverage fast-duplication of remote file system, which can bypass the local TaskManager when transferring data between remote working directory and checkpoint directory.

High-level Overview

Within the disaggregated state storage, ForSt temporarily holds state data in memory to create immutable state files, subsequently flushing them to the distributed file system (working directory). The remote state files and the checkpoint files may reside in different file directories, but share the same underlying distribution file system. Based on this premise, checkpoint/restore/rescaling essentially involves data transfer between two directories (working directory and checkpoint directory) on one remote file system.

image.png

Checkpoint: During checkpointing, ForSt freezes the snapshot files collection to prevent them from deletion in the synchronous phase; then fast-duplicate these files to the checkpoint directory in asynchronous phase; finally unlock these files, allowing for their deletion.

image.png

Restore & Rescale: Similarly, restore and rescaling can also employ fast-duplication to minimize network traffic and optimize recovery performance. Notice that since state files are not required to be downloaded to local storage, constraints on local disk space are no longer a concern for downscaling.

Proposed Changes

Checkpoint

The RocksDB (the foundation of ForSt) native core provides several ways to build a checkpoint for ForSt:

  • Option-1 Checkpoints feature[3] : Synchronously hard-link all db files to a specified directory.
  • Option-2 BackupableDB feature [4]: Synchronously create another backup DB and hard-link/copy all files to the backup db directory. It supports replicating files across different filesystem with different 'Env'.
  • Option-3 Low-level API combination: Use GetLiveFiles(), combining with DisableFileDeletion() and EnableFileDeletion() to do DB physical copy [5]. The GetLiveFiles() can be used to capture a consistent snapshot of db files, and then these files could be duplicated asynchronously.

For Option-1, most remote filesystem don't support hard link and typically work around by duplication API (eg. S3 copyObject[6], Azure Blob Storage copyBlob[7], Aliyun OSS copyObject[8]), which is significantly less efficient than hard-link of LocalFileSystem. Consequently, for the ForSt on DFS, the synchronous execution of Option-1 will cause task thread to be blocked for an extended duration during snapshot synchronous phase, negatively impacting the task threads' processing performance.

Option-2 faces a dilemma similar to Option-1, as synchronously copying or duplicating DB files results in extended blocking of task threads during snapshot synchronous phase. Moreover, in this way, the checkpointed SST files are managed by BackupableDB(TM side), which is different from the current way that the checkpoint files are managed by JM side .

In Option-3, GetLiveFiles() is a very lightweight operation which can be executed during snapshot synchronous phase to take a consistent snapshot; the io-intensive file duplication could be deferred to the asynchronous phase. Additionally, the management of checkpointed files (file create/delete/do incremental/...) can stay aligned with the existing mechanism.

Thus, we would like to choose Option-3 to support ForSt checkpointing, which offers greater flexibility and could integrate seamlessly with the existing checkpoint mechanisms.

In comparing snapshot processes between remote ForSt (Option-3) and local RocksDBStateBackend, the main differences are:

  • synchronous phase: RocksdbStateBackend needs hard-link all db files into local snapshot dir; while ForSt employs GetLiveFiles() API to capture a consistent snapshot;
  • asynchronous phase: RocksdbStateBackend uploads the local files to remote checkpoint-dir; while ForSt employs fast duplication to replicate the remote db files to checkpoint-dir.

Key steps in checkpoint sync & async phase

  • synchronous phase:
    1. Snapshot state MetaInfo;
    2. DB.disableFileDeletions(); //Prevent rocksdb file deletions
    3. LiveFiles liveFiles = DB.getLiveFiles(true); // capture a consistent snapshot and save the live // files Info within current version.
  • asynchronous phase:
    1. Materialize the state MetaInfo;
    2. Calculates the files collection to be duplicated based on the incremental/full policy;
    3. Fast-duplicate/Copy the checkpoint files in parallel;
    4. Generate KeyedStateHandle, and register it to JM;
    5. db.enableFileDeletions(false); // allow rockdb to delete obsolete files

File management

With disaggregated state storage, two distinct sets of files are located within the remote file system:

  • Working Files: These are the live files actively utilized by ForSt during its runtime operations.
  • Checkpoint Files: These files are employed to restore the system state to a particular moment in time.

It would be advantageous to reconsider and reorganize the file structure in the remote file system.

Remote directory layout

Considering that in current flink architecture, the checkpoint files/directory are owned by JobManager, and the working files/directory are owned by TaskManager, so the checkpoint directory and the ForSt's working directory will still use different directories for disaggregated state storage. This means that the remote state files still need to be copied/fast-duplicated to the checkpoint directory during checkpoint (fast-duplicate will be more lightweight), even through it is already on the DFS. In addition, to simplify user configuration, the default working directory can be a subdirectory of the checkpoint-taskowned directory without extra configuration. In this way, the remote directory layout of disaggregated state storage is shown as follows:

|--- Job-RemoteState-Checkpointing-Dir
      |--- chk-xxx
      |--- shared
             |-- checkpoint files                   
      |--- taskowned
             |--- remote-state-working-dir
     		       |--- subTask-state-sub-dir 
   	     		         |--- db
                      		  |--- working files

Cleanup for Remote working files: the basic cleanup mechanism for remote working directory is the same as existing rocksdb-stateBackend, that is, when TM exits, forst-stateBackend will delete the entire working dir. Regarding orphaned files cleanup in the case of TM crash, we should address it in conjunction with the TM ownership mechanism  in the future FLIP(see the "Mid/long term follow up work" section).

Checkpoint complete/subsume/abort

These behavior of ForSt is the same as that of local RocksdbStateBackend. The StreamStateHandle(checkpointed files) which is generated by TM will be registered to JM, which manages the checkpoint files lifecycle as it does currently.

Restore

The key steps of restoring an instance of ForSt:

  1. Rebuild LSM-tree of ForSt: Initialize the working directory as user configured; Fast Duplicate (Or Copy) all files from the corresponding subdir of restored checkpoint;
  2. Subsume Last Checkpoint when first checkpoint completed: Works similar to previous policies in both claim and no-claim mode.

Rescaling

ForSt will also leverage ClipDB/IngestDB[9] to accelerate rescaling and utilize the fast-duplication capabilities of the remote filesystem to enhance the transfer speed of state files. The key steps of ForSt rescaling are as follows:

  1. Each subTask fast-duplicate/copy the checkpoint files to its working dir, which may contains multiple ForSt intances;
  2. Use ClipDB to clear invalid data of multiple ForSt instances based on KeyGroupRange;
  3. Apply IngestDB to merge data from multiple ForSt instances.

Embedded RocksDB should download all DFS files to local disk before do clip or ingest, while remote ForSt can utilize fast-duplicate to speed up this process with the support of remote file system. Moreover, during rescaling of Flink job, which involves intensive I/O operations, fast-duplication offloads the I/O tasks to the remote file system, typically resulting in superior performance compared to file downloads.

Savepoint

  • Native savepoint: follow the existing mechanism, that is, do a full checkpoint at another savepoint dir;
  • Canonical savepoint: follow the existing mechanism(build a full key-value iterator for ForSt);

Related Features Support

  • Unaligned checkpoint: The unaligned checkpoint behavior would not be affected by ForSt checkpoint strategy.
  • Changelog: The enhancements that the ChangelogStateBackend brings to checkpointing are akin to the improvements afforded by the disaggregated state storage. So I suggest disaggregated ForSt don't support ChangelogStateBackend.
  • Local recovery: Within the context of disaggregated storage, the semantics of local recovery will be changed, which need to be combined with local-disk-cache. Future FLIPs will introduce the local disk cache to improve disaggregated storage performance. Subsequently, the local recovery feature could enhance the speed of local disk cache restoration.
  • FLIP-306[10]/Small File Merging Mechanism: the proposed basic checkpoint strategy would be compatiable with the small file merging mechanism.

Mid/long term follow up work

TM file ownership: Instead of letting JM manage the checkpoint files while TM manage the runtime state SSTs, TM takes the responsibility of managing all files considering both the lifecycles of checkpoints and live SSTs. Thus the DFS files can be easily reused when checkpointing and restoring. No more file re-uploading or fast-copy should be processed during checkpoint or state recovery. This work may break some of flink's existing file management mechanisms, and we will continue to advance these work in future FLIPs.

Compatibility, Deprecation, and Migration Plan

he Checkpointing/Restoring/Rescaling stategies will be implemented based on the newly introduced ForSt, and follow the existing checkpoint/restore mechanism as far as possible. This will introduce some code refactor work for RocksdbStateBackend/Checkpointing/Restoring, but it won't break the compatibility of existing public interface (including RocksdbStateBackend public interface).

Test Plan

  • New UT/ITs will be introduced for checkpointing/restoring/rescaling described in section 'Proposed Changes'.
  • New E2E tests of jobs that perform checkpointing/restoring/rescaling with ForSt will also be delivered.

Rejected Alternatives

  • Use Rocksdb BackupableDB[4] to build a checkpoint for ForStStateBackend

      Rocksdb BackupableDB feature offers a straightforward method to back up ForSt. It also support creating multiple backups which are incremental, and support deleting specified backups.

Rejected reason: 1) Flink must capture a consistent snapshot in synchronous phase of checkpointing, so the BackupableDB.createNewBackup() must be executed in snapshot synchronous phase, which will block task thread for a long duration; 2) In this way, the checkpointed files are fully managed by BackupableDB(TM side), which is somewhat inconsistent with the current way that checkpoint files are managed by JM.

Reference

[1] FLIP-423 https://cwiki.apache.org/confluence/x/R4p3EQ

[2] FLIP-427 https://cwiki.apache.org/confluence/x/T4p3EQ

[3] https://rocksdb.org/blog/2015/11/10/use-checkpoints-for-efficient-snapshots.html

[4] https://rocksdb.org/blog/2014/03/27/how-to-backup-rocksdb.html

[5] https://github.com/facebook/rocksdb/wiki/Replication-Helpers#functions-for-full-db-physical-copy

[6] https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html

[7] https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob

[8] https://www.alibabacloud.com/help/en/oss/developer-reference/copyobject

[9] https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files#ingesting-sst-files

[10] https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints


  • No labels