Current stateApproved

1. Motivation

Currently, there are many types of checkpoint file in Flink:

  • keyed state files
  • operator (non-keyed) state files
  • channel state files (when unaligned checkpoint enabled)
  • changelog state files (when generic incremental checkpoint enabled)

However, the creation of multiple checkpoint files can lead to a 'file flood' problem, in which a large number of files are written to the checkpoint storage in a short amount of time. This can cause issues in large clusters with high workloads, such as the creation and deletion of many files increasing the amount of file meta modification on DFS, leading to single-machine hotspot issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the performance of object storage (e.g. Amazon S3 and Alibaba OSS) can significantly decrease when listing objects, which is necessary for object name de-duplication before creating an object, further affecting the performance of directory manipulation in the file system's perspective of view (See hadoop-aws module documentation, section 'Warning #2: Directories are mimicked').

While many solutions have been proposed for individual types of state files (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel state), the file flood problems from each type of checkpoint file are similar and lack systematic view and solution. Therefore, the goal of this FLIP is to establish a unified file merging mechanism to address the file flood problem during checkpoint creation for all types of state files, including keyed, non-keyed, channel, and changelog state. This will significantly improve the system stability and availability of fault tolerance in Flink.

2. Existing Solutions

There are some existing solutions targeting the file flood problem during checkpoints:

  • Store checkpoint data in state handle (in JM memory) and persist in one file —— for keyed, non-keyed, channel state
  • Merge small files for keyed state (FLINK-11937) —— for keyed state
  • Batch uploading in Generalized Incremental Checkpoints —— for changelog state
  • Merge small channel state files (FLINK-26803) —— for channel state

Each solution handles one or several types of state files. Let's go through them to see what they can do and their limitations.

Store checkpoint data in state handle (in JM memory) and persist in one file

In the early version of Flink, ByteStreamStateHandle was introduced to collect small checkpoint data segments in the heap memory of the JM and serialize them in the checkpoint meta file. This approach was useful in eliminating files that were smaller than a certain threshold and helped alleviate the problem of file flooding.

Limitation: It may consume too much memory on the JM while merging files. If the threshold was set too high, it could lead to the JM falling into full garbage collection or even raising an out-of-memory error. On the other hand, if the threshold was set too low, it would not significantly decrease the number of files.

This is a solution to address the problem that many small files in checkpoint of keyed state. Whenever the TM needs to write an sst file to the checkpoint storage, a previous stream is reused to output the data. Thus, the TM merges several checkpoint files into one, and reports back to JM. JM is responsible to record the relationship between original sst files and the underlying files, and manage the life cycle of these underlying files.

Limitation: File management is pretty complicated in this solution. The original shared registry mechanism works based on a prerequisite that a state file will no longer be written when the state handle is reported to JM. Therefore, the JM could decide when to delete a file based on the existing checkpoint subsumption mechanism. However, in this solution, the TMs reuse the opened stream to write multiple data files (segments) into one file, and a state handle is reported to the JM right after a data segment is written into the stream. This means that even if all the data in a file will no longer be used, the JM cannot delete it until it ensures that the TM will not write new data into that file. This leads to complicated synchronization between the JM and the TMs. The potential corner cases and their solutions are described in the design doc (See section '2.4 Concurrent Checkpoint'). The PR does modifications on shared state registry on JM side, which is complicated and hard to understand for maintainers.

Batch uploading in Generalized Incremental Checkpoints

The Generalized Incremental Checkpoints (FLIP-158) introduced a state changelog, which periodically persists recent changes in state into checkpoint storage independently of checkpoints. The frequency of checkpoints does not affect when the state backend (such as RocksDB) takes a snapshot, resulting in a significant reduction in the number of small flushed files generated by the state backend.

However, the state changelogs themselves can become numerous and small, leading to a 'file flood' problem. Whenever a checkpoint is triggered, a changelog file is created for each subtask. Therefore, if checkpoints occur frequently in highly parallel jobs, numerous small changelog files will be generated, putting immense pressure on the DFS. To mitigate this issue, a batch uploading mechanism is introduced that can combine small changes within a TM (across subtasks) into a single file.

Limitation: The batch uploading is a specific solution for the current changelog storage implementation, as it creates an output stream instead of using the CheckpointStreamFactory as other states do. It is better to use a unified framework to manipulate the checkpoint storage, which is more convenient for expansion and maintenance.

When making an unaligned checkpoint, Flink generates a file of channel state for each subtask, persisting the in-flight data. This can result in a large number of channel state files for a high parallelism job. FLINK-26803 offers a solution by merging all channel state files from a single checkpoint and TM into a single file, resulting in a significant reduction in the number of files, depending on the number of slots per TM.

Limitation: The effect is limited when taskmanager.numberOfTaskSlots is small.

3. High-level Overview

This FLIP proposes a unified file merging framework to solve the file flood problem during flink checkpoint, which could be applied to all types of checkpoint files, including keyed state, non-keyed state, channel state and changelog state. The primary objective is to minimize file open and close operations to reduce access and modification of metadata in the DFS. The proposed approach enables TM to write several data segments within an opened file, with the written files owned by TM. Ownership of a single checkpoint file is transferred to TM, while JM manages the parent directory of these files. TM leverages checkpoint notification (aborted, subsumed) from JM to manage (delete) specific files. The implementation is straightforward and does not require modifications on the JM side. It introduces a new code path in TM, which can be safely enabled and disabled as needed.


Fig.1 Overview of file ownership


The main features of this approach include:

  • Universality, as it is applicable for all types of checkpoint files.
  • Clean design and implementation, where state files are owned by TMs and directories are owned by the JM. This simplifies the implementation and avoids complicated synchronization between the JM and TM, ensuring good maintainability.
  • Flexibility, as it supports file merging across different checkpoints or within a checkpoint, and within a subtask or task manager.
  • Fully backward compatibility.

For more information on the proposed solution, please refer to the 'Proposed Design' section for a comprehensive overview of key issues.

4. Proposed Design

4.1 Merge files

The goal is to reduce file creation and deletion. To achieve this, a reused output stream is introduced that could:

  • remain open after a data segment is written, allowing it to be reused for sequentially written segments.
  • generate a state handle containing the meta information for each written segment such as the underlying file path, offset and written length.

The Flink has two checkpoint scopes, defining whether the state file is owned by one checkpoint (EXCLUSIVE scope) or shared by multiple checkpoints (SHARED scope). The state files for two scopes are as follows:

  • SHARED scope (shared states): keyed state files of incremental checkpoints, and changelog files (currently not in any scope, but could be classified into shared scope, because the changelog files are shared by multiple checkpoints).
  • EXCLUSIVE scope (private states): keyed state files of full checkpoint, non-keyed state (operator state) files, and channel state files.

As Flink stores shared and private states in different paths, file merging is performed separately for each scope. The table below lists the behavior of shared and private states in different aspects. For further information, please consult the following sections.


Granularity of merging

Support merging across checkpoints

Need file copy after failover

Need file copy after restoring from retained checkpoint (a new job)

Shared state

per subtask

yes

no

yes

Private state

per taskmanager

yes

no

no

4.1.1 Merge files within a checkpoint or across checkpoints

In this approach, one checkpoint can reuse the upload stream of the previous checkpoint. By doing so, the number of files will be further reduced. But it does not work in all circumstances since the Flink has special requirements for checkpoint files. File visibility is needed when a Flink job recovers after a checkpoint is materialized. In some DFS, such as most object storages, a file is only visible after it is closed. Closing files after a checkpoint contradicts sharing the upload stream across checkpoints, making it impossible to merge files across checkpoints. And in some other DFS, such as HDFS, it does not need to close a file before reading it (a sync() method call is enough). Merging checkpoint files across different checkpoints could be applied without any influence on the file visibility requirement. Thus there are two options for merging files in time dimension:

  • SEGMENTED_WITHIN_CP_BOUNDARY: Only merge checkpoint files from the same checkpoint.
  • SEGMENTED_ACROSS_CP_BOUNDARY: Able to merge checkpoint files across different checkpoints. This option is only supported in some file systems.

Fig.2 File visibility when checkpoint finished and two merging modes in time dimension

4.1.2 Merge files within a subtask or a TM

When merging files, there are two levels of granularity to consider: subtask or TM. Generally, merging at the TM level is more effective since there are typically more files to merge. However, when restarting a job, the limitations of merging at the TM level become apparent. During restoration, checkpoint files may still be managed by the new job (see 'Claiming the snapshot' in FLIP-193). Given that TMs are reassigned after restoration, it is difficult to manage physical files that contain data from multiple subtasks scattered across different TMs (as depicted in Fig.3). There is no synchronization mechanism between TMs, making file management in this scenario challenging. One possibility is to make multiple copies of the file and have each TM manage a unique copy, but this would result in significant data transfer over the network.


Fig.3 The shared state file management problem when merging files per TM level after job restoring.

In order to manage the transition of file management from old TMs to new TMs without incurring additional network and space overhead, a file merging approach at the subtask level is adopted. Since the subtask is the smallest scheduling unit, a physical file that contains data from a single subtask will only be managed by one TM after restoration (without rescaling). It is easy for the new subtask to take over the file management of old files. However, for private states, the old state files will not be kept when the first checkpoint of the new job is completed; they will be subsumed and deleted by the JM. For these state files, there is no need to consider the management of old files by TM after restoration. Thus, merging at the TM level is chosen for these state files.

In summary, different policy is applied for different state files:

  • For shared states, files are merged within each subtask.
  • For private states, files are merged within each TM.


Merging DSTL files (changelog state files)

DSTL files are a special case of shared states, since these files will no longer be shared after job restore. So merging at TM level does NOT bring in additional network and space overhead. Currently the batch uploading of GIC(FLIP-158) combines small changes within a TM (across subtasks) into a single file. To prevent performance regression of its merging, they are still merged with in each TM.

  • For DSTL files (changelog state files), they are merged within each TM.


4.2 Rescaling and Physical File Lifecycle

Restarting a job with a new configuration is a common operation for Flink users. As previously discussed, for shared states in CLAIM recovery mode, the TMs of the new job must take responsibility for managing the old checkpoint files. As files are merged within each subtask for shared states, a physical file belongs to one subtask. If the user does not change the job's parallelism, the old and new subtasks correspond one-to-one, making it a simple take-over scenario. In this case, the new subtask only manages files from the corresponding old subtask. If the user does change the parallelism, an old physical file will be retained by several new subtasks. To avoid communication between subtasks in file management, multiple copies of old files are created, with each new subtask managing a unique copy. The original files are deleted by the JM when the old checkpoint is subsumed, since no TM (or subtask) will claim ownership of these files. Fig.4 and 5 show the checkpoint file management after restoring the job without and with a change in parallelism, respectively.


Fig.4 Shared state file management after restoring with same parallelism

Fig.5 Shared state file management after restoring with a different parallelism

For files in an exclusive scope, since new checkpoints do not require old checkpoint files (as they are subsumed in the new files), the new TMs do not claim ownership of these files, and they are deleted by the JM if necessary.

4.3 Physical File Layout and State Handles

In section 4.2, it was noted that sometimes the JM deletes old checkpoint files entirely when a new checkpoint is finished after restoring. To simplify file deletion and file ownership transfer, this FLIP proposes a new physical file layout for checkpoint storage. The directories for exclusive scope, shared scope, and task-owned states remain the same. For shared states, a sub-directory is created in the shared directory for each subtask. And for private states, a taskmanager-owned directory is introduced under the checkpoint storage, and a sub-directory is created in the taskmanager-owned directory for each TM. These sub-directories are not under the chk-x directory because the chk-x directories do not survive across checkpoints, but physical files may contain states from different checkpoints. The difference between shared and private states is due to the different file merging granularities, as discussed in Section 4.1.2. Fig.6 illustrates the newly proposed checkpoint storage file layout.



Fig.6 Proposed checkpoint storage layout.


This proposal suggests that TM report two types of StateHandle to the JM:

  • SegmentFileStateHandle: Represent one data segment within a physical file.
  • DirectoryStateHandle: Represent subtask or TM owned sub-directory.

SegmentFileStateHandles are included in the checkpoint meta and used during restoration. However, JM does not employ them for file deletion (SegmentFileStateHandle#discardState() performs no action). Instead, the reported DirectoryStateHandle is used by JM to monitor the sub-directories for subtasks or TMs and delete unused ones. The file copying and deletion during rescaling, as discussed in 4.2, are accomplished by sub-directory migration and DirectoryStateHandle alterations.

In the case of shared states, when a job failover occurs, the subtask-owned sub-directory remains the same. The JM receives the same DirectoryStateHandles and does not delete any directories or files. However, when a user restarts a job manually, the sub-directory for each subtask changes due to a change in job parallelism or checkpoint storage base path. The JM receives different DirectoryStateHandles and deletes the original directories later. Fig.7 and 8 describe these two cases.


Fig.7 Subtask-owned folder consistent across job restarts (failover).


Fig.8 Subtask-owned folder migration after restoring from a retained checkpoint.


For private states, each TM claims a new sub-directory and DirectoryStateHandle under the taskmanager-owned path as long as a job restoration occurs. This is because the old private state files will not be included in the new checkpoint, rendering the closed files for private state useless for further file merging.

The sub-directory path contains information of current subtask/TM, which is enough for TMs to determine whether to reuse the old one or migrate to a new one.

  • Path for shared states of each subtask: ${checkpointBaseDir}/shared/subtask-{index}-{parallelism}
  • Path for private states of each task manager: ${checkpointBaseDir}/taskmanager-owned/${tmResourceId}

Creating a sub-directory for each granularity unit is a good plan, as it provides a quick way for JM to delete a batch of unclaimed files. Compared to deleting specified physical files, deleting directories does not require JM to know the complicated relationship between checkpoints and physical files, which is maintained by the TMs. Additionally, deleting the directories as a whole helps to remove leftover files (half-written files in the directory that have not been reported to JM), improving the stability of the system.

4.4 Manage files and segments in TM

TM records the reference between checkpoint data and underlying files. On checkpoint subsumption or abortion, TM will delete the no-longer-needed files. The overall process is as Fig.9 shows:


Fig.9 Overview of the file management


To determine which checkpoint data is no longer needed, TM uses a watermark of subsumed checkpoint ID. This approach effectively avoids any issues resulting from the loss of checkpoint notifications. When a checkpoint is removed, the related data segments or state handles are also deleted, and their references to underlying files are released. As a result, TM can safely delete the files that are no longer being used.


Fig.10 Reference counting between logical and physical files.

4.5 Recover the file meta info of restored checkpoint

The relationship between data segments and physical files is crucial for TM to manage file storage, and this relationship needs to remain consistent across multiple job running attempts. Fortunately, the set of newly introduced state handles contains all the necessary information, including the offset, length, and underlying file of data segments, to recover the internal state of TM.

These state handles are persisted by the state handle registry in the JM and returned to the related TM after restoring, enabling TM to achieve persistence and recovery. By leveraging these state handles, TM can reliably manage file storage and ensure that the relationship between data segments and physical files remains in sync across job attempts.

4.6 Support concurrent checkpoint

The reused output stream can only support single-threaded writing, which poses a problem when multiple checkpoints are executed concurrently and several stream writers are open at the same time. To address this issue, a concurrent pool of reusable streams is introduced to handle the multiple-writer scenario. This pool can provide an open stream or create one when requested. Each writer will get an exclusive stream from the pool. Once an output stream is released by one writer, it is returned to the pool for future use. This ensures that different concurrently executed checkpoints write to different streams, which can then be reused by other checkpoint writers after being returned to the pool.

4.7 Space amplification

The lifecycle of each shared state varies depending on the number of consecutive checkpoints in which it is included. It is possible that some states within a physical file are still valid while others are no longer needed. However, a physical file cannot be deleted until every data segment within it is invalid, resulting in wasted storage space on the DFS. This waste is referred to as space amplification, which is defined as the magnification of the occupied space compared to the amount of valid data. To minimize space waste, an asynchronous compaction of underlying files is scheduled after a checkpoint is aborted or subsumed. This process involves the following steps:

  1. Checking whether the space amplification of each file is greater than a preset threshold and collecting files that exceed the threshold for compaction.
  2. Compacting the files, creating new state handles for rewritten segments, and recording the mapping from old handles to corresponding new handles.
  3. In the following checkpoint, when reporting StateHandles on files that have been compacted, replace the old handle with the new handle based on the mapping above.
  4. The new checkpoint will add a reference to the new state handle in the TM, and the old handle and underlying files will be deleted when there are no more references from checkpoints.

4.8 Error handling

The JM and TM may fail during a checkpoint, and the file merging framework should properly handle these edge cases.

  • Failover: The new subtask will take over the previously managed sub-directory and delete the un-referenced files in best effort.
  • Checkpoint notification message lost: There is a possibility that the notification message from JM to TM may be lost due to some reason. To address this, TM uses a watermark to keep track of the latest checkpoint that utilizes a file or segment, and releases the file or segment when a later checkpoint is subsumed. This ensures that even if notification is lost, the arrival of later messages will still trigger the file cleaning procedure.
  • Job stops before the first checkpoint finishes: If the TM have not reported the DirectoryStateHandle of the sub-directory, the TM will delete the sub-directory on exit. Otherwise, the JM will take care of the sub-directory.

4.9. State ownership comparison with other designs

The FLINK-23342 and its design doc[1] provide several designs of state ownership to overcome the disadvantages of having the state files managed by JM. The state ownership design of this FLIP is pretty much like the option 3 in that doc. The main difference is that this FLIP implies the concept of 'epoch' of that doc in the folder path for each granularity (as described in 4.3). Comparing the JM-TM mixed ownership designs of the FLINK-23342 with this FLIP:



Current Flink

Option 5 from the doc: Split counters

Option 3 from the doc: Epochs

Design of this FLIP

Ownership

JM

Mixed

Mixed

Mixed

Granularity of tracking shared state

File

File

Folder

Folder

Fragility

Bad

Good

Good

Good

Reupload

On abort

-

On epoch change

On job restart by user

Cleanup efficiency

Bad

Medium (files but distributed)

Good (folder at once)

Good (folder at once)

Cleanup reliability

Bad

Bad

Good

Good

RPC overhead

Bad

Good

Good

Good

Dev. time

-

Medium / Bad

Medium

Medium

Invasive change of shared state registry

-

Bad

Good

Very Good


5. Public interfaces and User Cases

Several new configuration options are added:

  • state.checkpoints.file-merging: A configuration option that allows the user to enable or disable the file merging feature.
  • state.checkpoints.file-merging.across-checkpoint-boundary: A configuration option that enables merging of files within a checkpoint or across multiple checkpoints (as discussed in section 4.1.1).
  • state.checkpoints.file-merging.max-file-size: A configuration option that sets a maximum size limit for physical files.
  • state.checkpoints.file-merging.max-file-pool-size: A configuration option that specifies the upper limit of the file pool size for concurrent writing.
  • state.checkpoints.file-merging.max-subtasks-per-file:  A configuration option that specifies the lower limit of the file pool size based on the number of subtasks within each TM (only for merging at TM level).
  • state.checkpoints.file-merging.max-space-amplification: A threshold that triggers a compaction (re-uploading) of files (as discussed in section 4.7).

Both forward and backward compatibility are supported, and these options only affect new files, not old ones. If the user enables the merging feature and restores a job from an old checkpoint, the new files will be merged while the old checkpoints remain separate. Conversely, if the user disables the feature and restores the job from a checkpoint with merged files, the new files will remain separate while the old files remain merged. The reason for keeping old files untouched is to save significant data transfer over the network, as file copying is unnecessary.


6. POC and Test results

We have implemented a POC (in branch https://github.com/Zakelly/flink/tree/flip306_poc) which only supports merging state in EXCLUSIVE scope (eg. operator state and channel state), and test it on a simple job of word count with a tumbling window in 4 parallelism running in one single TM. We collect and compare the file creation and deletion according to our log, with and without file merging enabled. Only the files in the EXCLUSIVE scope counts. The merging target size is set as 32MB and the job is running for 4 hours before. Here's the result:


No merging

Merge within each checkpoint

Merge across checkpoints

Aligned checkpoint

create

5756

3295

1920

delete

5752

3292

1916

Unaligned checkpoint

create

10222

6081

1940

delete

10214

6077

1932

As shown above, this sample job demonstrates that by merging only the operator state and channel state, over 40% of file creation and deletion operations can be eliminated within each checkpoint. Moreover, when merging across multiple checkpoints, this percentage increases to 88%. Additionally, the number of small files is higher in unaligned checkpoints compared to aligned checkpoints, so our solution performs better when unaligned checkpoints are enabled.

7. Affected interfaces

This FLIP will NOT change any existing interfaces, and only add some serialization/deserialization of new introduced state handle and initialization/restoring logic of TM somewhere in existing classes. Some changes of internal classes will be made if batch uploading of changelog state migrate to this file merging framework.

Some important interfaces/classes are as follows:

SegmentSnapshotManager

/** Manages meta information in TM side when taking segmented checkpoints. */
public interface SegmentSnapshotManager extends InternalCheckpointListener {

    /** Restore the file mapping information from the state handles of operator states. */
    void addOperatorStateHandles(
            long checkpointId, List<StateObjectCollection<OperatorStateHandle>> stateHandles);

    /** Restore the file mapping information from the state handles of keyed states. */
    void addKeyedStateHandles(
            long checkpointId, List<StateObjectCollection<KeyedStateHandle>> stateHandles);

    /**
     * Create a new {@link FsSegmentCheckpointStateOutputStream}. Multiple streams may write to the
     * same physical files. When a stream is closed, it returns a {@link SegmentFileStateHandle}
     * that can retrieve the state back.
     *
     * @param checkpointId ID of the checkpoint.
     * @param scope The state's scope, whether it is exclusive or shared.
     * @return An output stream that writes state for the given checkpoint.
     */     
     FsSegmentCheckpointStateOutputStream createCheckpointStateOutputStream(
            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope);

     /** Return the working directory of the segment snapshot manager. */
     Path getSsmManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
}

FsSegmentCheckpointStateOutputStream

public class FsSegmentCheckpointStateOutputStream
        extends FsCheckpointStreamFactory.FsCheckpointStateOutputStream {

    /**
     * A manager which stands between a {@link SegmentSnapshotManager} and a {@link
     * FsSegmentCheckpointStateOutputStream}. It works as a delegation of the {@link
     * SegmentSnapshotManager} to deal with the physical file of the {@link
     * FsSegmentCheckpointStateOutputStream}.
     */
    public abstract static class SegmentedCheckpointStateOutputStreamHandler {
        /**
         * Provide a physical file.
         *
         * @return Output stream and path of the physical file.
         * @throws IOException if the physical file cannot be created or opened.
         */
        public abstract Tuple2<FSDataOutputStream, Path> providePhysicalFile() throws IOException;

        /**
         * Close the stream and create a {@link SegmentFileStateHandle} for a file segment.
         *
         * @param filePath Path of the physical file.
         * @param startPos Start position of the segment in the physical file.
         * @param stateSize Size of the segment.
         * @return The state handle of the segment.
         * @throws IOException Thrown, if any exception happens when closing/deleting file.
         */
        public abstract SegmentFileStateHandle closeStreamAndCreateStateHandle(
                Path filePath, long startPos, long stateSize) throws IOException;

        /**
         * Notify the {@link SegmentSnapshotManager} that the stream is closed exceptionally.
         *
         * @throws IOException Thrown, if any exception happens when closing/deleting file.
         */
        public abstract void closeStream() throws IOException;
    }
            
    private @Nullable FSDataOutputStream outputStream;

    private final SegmentedCheckpointStateOutputStreamHandler ssmDelegation;

    private final byte[] writeBuffer;

    /** pos in the writeBuffer. */
    int bufferPos;

    /** pos in the physical file (relative to the starting position). */
    long filePosRelative = 0;

    private long startPos;

    private Path filePath;

    private volatile boolean closed;

    public FsSegmentCheckpointStateOutputStream(
            long checkpointID,
            int bufferSize,
            SegmentedCheckpointStateOutputStreamHandler ssmDelegation) {
        super(null, null, bufferSize, -1);
        this.ssmDelegation = ssmDelegation;
        this.writeBuffer = new byte[bufferSize];
    }

    @Override
    public long getPos() throws IOException {
        // The starting position is not determined until a physical file has been assigned, so
        // we return the relative value to the starting position in this method
        return bufferPos + filePosRelative;
    }

    @Override
    public void write(int b) throws IOException {
       // Write based on relative offset
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        // Write based on relative offset
    }

    @Override
    public void flush() throws IOException {
        // Omit
    }

    @Override
    public void sync() throws IOException {
        // Omit
    }

    @Nullable
    @Override
    public SegmentFileStateHandle closeAndGetHandle() throws IOException {
        // use ssmDelegation.closeAndGetHandle, not actually close the stream
    }

    @Override
    public void close() {
        // Omit
    }
}

SegmentFileStateHandle

public class SegmentFileStateHandle extends FileStateHandle {

    private static final long serialVersionUID = 1L;

    private final long startPos;
    private final long stateSize;

    private final LogicalFile.LogicalFileId logicalFileId;

    private final CheckpointedStateScope scope;

    /** Creates a new file state for the given file path. */
    public SegmentFileStateHandle(
            Path filePath,
            long startPos,
            long stateSize,
            LogicalFile.LogicalFileId logicalFileId,
            CheckpointedStateScope scope) {
        super(filePath, stateSize);
        this.startPos = startPos;
        this.scope = scope;
        this.stateSize = stateSize;
        this.logicalFileId = logicalFileId;
    }

    /**
     * This method should be empty, so that the JM is not in charge of the lifecycle of files in a
     * segmented checkpoint.
     */
    @Override
    public void discardState() {}

    @Override
    public FSDataInputStream openInputStream() throws IOException {
        FSDataInputStream inputStream = super.openInputStream();
        return new FsSegmentDataInputStream(inputStream, startPos, stateSize);
    }

    // omit getters and setters
}

8. Implementation Plan

The FLIP will be implemented by the following steps:

  1. Introduce the new interfaces and classes and the basic logic for file merging and file management.
  2. User-defined options for configuring and initialization logic.
  3. File reference counting restoring.
  4. File merging for operator state.
  5. File merging for keyed state.
  6. File merging for channel state.
  7. If possible, migrate current batch upload implementation for changelog states to this proposal.
  8. Control space amplification.

9. Compatibility, Deprecation, and Migration Plan

All proposed changes should take effect only when user enable it. And it provides totally backwards compatibility with old checkpoints. No modification on existing interfaces and no deprecation should be applied.

10. Test Plan

UTs for each newly introduced classes and interfaces, ITs of checkpointing with proposed file merging enabled, and ITs for backwards compatibility checking and error handling. Performance tests will also be done before the feature release.

11. Rejected Alternatives

None for now.



\[1\] The design doc of FLINK-23342: https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#