Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know whole story.


Motivation

As described in FLIP-423, there are some tough issues about embedded state backend on local file system, respecially when dealing with extremely large state:

  1. Constraints of local disk space complicate the prediction of storage requirements, potentially leading to job failures: Especially in cloud native deployment mode, pre-allocated local disks typically face strict capacity constraints, making it challenging to forecast the size requirements of job states. Over-provisioning disk space results in unnecessary resource overhead, while under-provisioning risks job failure due to insufficient space.
  2. The tight coupling of compute and storage resources leads to underutilization and increased waste: Jobs can generally be categorized as either CPU-intensive or IO-intensive. In a coupled architecture, CPU-intensive jobs leave a significant portion of storage resources underutilized, whereas IO-intensive jobs result in idle computing resources.

By considering remote storage as the primary storage, all working states are maintained on the remote file system, which brings several advantages:

  1. Remote storages e.g. S3/HDFS typically offer elastic scalability, theoretically providing unlimited space.
  2. The allocation of remote storage resources can be optimized by reducing them for CPU-intensive jobs and augmenting them for IO-intensive jobs, thus enhancing overall resource utilization.
  3. This architecture facilitates a highly efficient and lightweight process for checkpointing, recovery, and rescaling through fast copy or simple move.

This FLIP aims to realize disaggregated state for our new key-value store named ForSt which evloves from RocksDB and supports remote file system. This makes Flink get rid of the disadvantages by coupled state architecture and embrace the scalable as well as flexible cloud-native storage.

Proposed Changes

As discussed in FLIP-423, RocksDB(frocksdb) is selected to support disaggregated storage architecture because it has been validated by extensive production practices with Flink jobs and it also boasts significant scalability.

We plan to extend RocksDB to run on all file systems supported by Flink, thereby allowing state to be stored on various distributed file systems. The new proposed store is named ForSt, which stands for 'For-Streaming Storage'. However, currently RocksDB only supports local file systems, so additional efforts are required to support this:

  1. Remote File Systems Intergration in ForSt side. We will leverage JNI bridge to proxy any I/O operations from ForSt to Flink FileSystem interface, where many file systems have already been supported.
  2. Implement new State Backend in Flink side. We will introduce a new state backend called ForStStateBackend to support ForSt.

ForSt Side : Remote File Systems Intergration

image.png

ForSt is built on top of RocksDB. While RocksDB supports pluggable filesystem[1] which helps users to build RocksDB on different filesystems conveniently, ForSt should integrate remote filesystems . There are two proposals implementing this as we could see in the above graph:

  • Proposal 1: Implement every file system based on ForSt's filesystem interface.
  • Proposal 2: Leverage JNI bridge to proxy any I/O operations from ForSt to Flink FileSystem interface, where many file systems have already been supported.

Compared Proposal 1 to Proposal 2

Performace

(Proposal 1 is better)

Proposal 2 has additional JNI overhead, but the overhead is relatively negligible when weighed against the latency of remote I/O.

Achievability

(Proposal 2 is better)

Due to the lack of a unified abstraction for different file systems, Proposal 1 requires implementation in C++ for each file system, which constitutes a considerable amount of work.

While for Proposal 2, the JNI bridge shoud be established once between ForSt's interfaces and Flink's. It is a one-time job that works for every file systems that Flink has already supported.

Maintainability

(Proposal 2 is better)

For Proposal 1, all different versions and dependencies related to the C++ file system libraries must be independently maintained and kept consistent with Flink.

For Proposal 2, the critical logic of JNI bridge is consistent. The detailed implementations are maintained in Flink, which is much more easier for further developing.

Scalability

(Proposal 2 is better)

For Proposal 2, the implementation in ForSt universal, all filesystems supported in Flink can be supported easily in java.

As compared in the table, we'd like to choose proposal 2 firstly to support commonly used filesystems which Flink supports, e.g. hdfs, s3 and oss by using JNI to bridge the gap between ForSt FileSystem and Flink FileSystem.

Of course, proposal 1 could be considered as an optimization to improve the performance in the future. This could be implemented in ForSt side, then Flink side only need to change used env[6] to support it.

We will introduce some key points of the intergration for proposal 2 in the following subsections.

FS Interface bridge

As outlined above, this FLIP employs a JNI bridge to redirect all I/O operations from ForSt to the Flink FileSystem interface. The table below enumerates the interfaces from ForSt and specifies the corresponding Flink FileSystem interfaces that the implementation should use.

Interface Reference

Interface of ForSt

Which interface of Flink should be used

Comment

Constructor

get

  1. Flink FileSystem#initialize should be called before this so that all filesystems are usable in Flink side
  2. ForSt maintains the life cycle of filesystem object

FileExists

exists

Could be implemented using the method of Flink simply

CreateDir

mkdirs

CreateDirIfMissing

mkdirs

DeleteDir

delete

DeleteFile

delete

GetChildren

listStatus

RenameFile

rename

GetFileSize

getFileStatus

GetFileModificationTime

getFileStatus

IsDirectory

getFileStatus

NewDirectory -> FSDirectory

getFileStatus

LockFile/UnlockFile

-

Fine if unsupported: Lock for different DB processors, Flink doesn't rely on this.

NewSequentialFile ->

FSSequentialFile

open -> FSDataInputStream

  1. ForSt maintains the life cycle of FSDataOutputStream/FSDataInputStream
  2. Flink FSDataOutputStream/FSDataInputStream should support ByteBuffer to read and write to avoid copying data frequently

NewRandomAccessFile -> FSRandomAccessFile

NewWritableFile -> FSWritableFile

create -> FSDataOutputStream

NewLogger -> Logger

-

LOG in FileSystem,Could be implemented by NewWritableFile

JNI Related

JNI is a part of the Java platform that allows Java code to interact with code written in other languages, typically C or C++. JNIEnv is the key component of JNI through which native methods can interact with the JVM, providing rich methods e.g. creating new Java Objects, calling methods, handling exceptions. To interact with JVM in C++ side, three steps should be followed:

Step

The core issue to be resolved

Solution

Step 1: Init and maintain JNIEnv

The JNIEnv each call used should be consistent with the one used by the JVM of TaskManager

Global JNIEnv

Step 2: Use JNIEnv to create Java Objects and call corresponding methods

current methods about reading/ writing files of Flink FileSystem are not enough due to unavoidable data copy between C++ memory and Java heap as we could see in the above table.

Reading/Writing files with ByteBuffer

Step 3: Maintain the creation and release of these objects

Since the ForSt is the one that calls the interfaces, the objects' lifecycle should be managed in DB side.

Maintain the lifecycle of these objects

The details of the three solutions are as follows:

  1. Global JNIEnv. The JNIEnv could be maintained globally by implementing JNI_Onload[2] and AttachCurrentThread[3]. Then All JNI call will use JNIEnv same as the JVM of TaskManager.
  2. Reading/Writing files with ByteBuffer. We need to introduce a ForStFileSystem in Flink to support reading and writing data by ByteBuffer whose pseudocode could be seen in "Implement new FileSystem". Then ForSt could allocate ByteBuffer and use it to call new methods of this class. The lifecycle of ByteBuffer is maintained by ForSt as described below.
  3. Maintain the lifecycle of these objects. The lifecycle of all objects from Flink FileSystem, e.g. FileSystem, FSDataOutputStream, FSDataInputStream should be maintained in ForSt. As we could see in the below graph, The lifecycle of these objects:
    1. ForSt creates some objects by methods of FileSystem in Flink side when ForSt accesses its FileSystem. these objects are saved as a member variable in ForSt side.
    2. ForSt will use these objects to executes some FileSystem operations including reading and writing files at runtime.
    3. These objects are destructed when these classes are released.

image.png  

Other behaviors in ForSt

The other parts of ForSt built on the file system, such as reading, writing and compation, all those parts are similar as local. Some stratiges could be introduced to improve which will be described in "Future Avaliable Improvement".

Flink Side: Implementation Details

As discussed in FLIP-423, we will introduce a new state backend called ForStStateBackend to support the ForSt.

Basically, many classes could be extracted and reused (see "Extract Common Classes"). Remained classes containing the logic of Initialization, Configuration, Checkpointing, Restoring should be re-implemeted e.g. ForStStateBackendFactory / ForStStateBackend / ForStKeyedStateBackend:

  1. Initialization: The directory should be initialized in the remote storage
  2. Configuration: some extra configurations should be parsed and some unsupported configurations should be checked
  3. Checkpoint & Restoring & Rescaling: The strategy of checkpointing & restoring & rescaling is different from ForStStateBackend due to existing files in remote storage which will be described in "Basic Strategy of Checkpointing & Restoring & Rescaling"

Besides, As we discussed in "Methods of reading/writing files", we also need to implement a ForStFileSystem to support reading and writing with ByteBuffer which will be introduced in "Implement ForStFileSystem".

Extract Common Classes

Many classes are common for RocksDBStateBackend and ForStStateBackend. For example :

  1. related to basic configuration. e.g. RocksDBOptions, PredefinedOptions
  2. related to resource control. e.g. RocksDBResourceContainer, RocksDBSharedResources
  3. related to state interface. e.g. RocksDBValueState, RocksDBMapState

So we'd like to extract a common Module called "flink-statebackend-rocksdb-common" and put these common classes into the new module.

Implement new FileSystem

A new class called ForStFileSystem which wrappers FileSystem will be introduced to be responsible for:

  1. Supporting reading/writing data from/to remote storage with ByteBuffer
  2. Introducing some metrics in filesystem level to monitor reading and writing
  3. Introducing cache strategies in filesystem level

Its pseudocode could be seen as below.


ForStFileSystem.java
/**
 * ForStFileSystem is a class used to wrapper current flink FileSystem   
 * and expose some interfaces to the ForSt .
 */
public class ForStFileSystem extends FileSystem {

    // original FileSystem
    private final FileSystem flinkFS;

    public static FileSystem get(URI uri) throws IOException {
        // Wrapper original FileSystem#get(URI uri)
    }

    // Wrapper other methods in FileSystem

    @Override
    public ByteBufferReadableFSDataInputStream open(Path f, int bufferSize) throws IOException {
        // Call the original FileSystem#open
        FSDataInputStream original = flinkFS.open(f, bufferSize);
        long fileSize = flinkFS.getFileStatus(f).getLen();
        
        // Wrapper with ByteBufferReadableFSDataInputStream
        return new ByteBufferReadableFSDataInputStream(f, original,
                () -> flinkFS.open(f, bufferSize),
                fileSize);
    }

    @Override
    public ByteBufferWritableFSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
        // Call the original FileSystem#create
        FSDataOutputStream original = flinkFS.create(f, overwriteMode);
        
        // Wrapper with ByteBufferWritableFSDataOutputStream
        return new ByteBufferWritableFSDataOutputStream(f, original);
    }
ByteBufferReadableFSDataInputStream.java
public class ByteBufferReadableFSDataInputStream extends FSDataInputStream {

    public ByteBufferReadableFSDataInputStream(
            Path path,
            FSDataInputStream fsdis,
            long totalFileSize) {
        // init
    }

    public int readFully(long position, ByteBuffer bb) throws IOException {
        // Read data from remote storage with ByteBuffer which will be called in the ForSt
    }

    // Wrapper other methods in FSDataInputStream
}
ByteBufferWritableFSDataOutputStream.java
public class ByteBufferWritableFSDataOutputStream extends FSDataOutputStream {

    public ByteBufferWritableFSDataOutputStream(Path path, 
                                                FSDataOutputStream fsdos) {
        // init
    }

    public void write(ByteBuffer bb) throws IOException {
        // Write data to remote storage with ByteBuffer which will be called in  the ForSt
    }

    // Wrapper other methods in FSDataOutputStream
}

ForSt will init ForStFileSystem and call its open and create methods by JNI, then use int readFully(long position, ByteBuffer bb) and void write(ByteBuffer bb) to read or write data.

Local File Cache

Always reading and writing files from remote storage can lead to a significant increase in latency. Therefore, we introduced local file cache to speed up file I/O operations.

We could leverage existing LocalFileSystem and introduce a FIFO File Cache mechanism to make full use of local disk space, prioritizing retrieval from the LocalFileSystem. Only when local disk space is insufficient, ForStFileSystem will be accessed.

Directory layout

Working files are the live files actively utilized by ForSt during its runtime operations which are located within the remote file system.

The directory layout of working files is shown as below. It's maintained in the directory of "Job-checkpointing-dir/taskowned" by default, whose directory could be modified by the new option which is described in "Public Interface Changes".

The life cycle (including creating and cleanup) of working dir is managed as before local strategy.

There maybe orphaned files in some corner cases since StateBackend just try its best to clean up its working files during StateBackend#dispose, we could leave the optimization in the future FLIP such as FLIP-432.

The directory layout of working set
// Default layout by default
|--- Job-checkpointing-dir
      |--- chk-xxx
      |--- shared
      |--- taskowned
            | --- working file dir
                	|--- subTask-sub-dir 
                   	     |--- db
                    	        | --- working files

// User-defined directory
|--- User-defined-RemoteState-working-dir
	|--- subTask-sub-dir 
   	     |--- db
    	        | --- working files

Checkpointing & Recovery

The checkpointing and recovery involve copying files from one location to another. For the very basic version of ForStStateBackend, it will do downloading and re-uploading to copy files within remote file systems. This is pretty much like the behavior of RocksDBStateBackend except that the hard-link is unavailable for DFS.

The FLIP-428 proposes a more lightweight checkpointing and recovery, which leverages file reusing and fast-copy mechanism. Please refer to FLIP-428 for more details.

Other Related Improvement

There are many available improvement will be described in other FLIPs:

  1. Async IO: Async IO could help a lot especially for IO-intensive jobs, see FLIP-424 & FLIP-425 for more details.
  2. Batch MultiGet: RocksDB multiGet[4] could reduce IO requests greatly, which will be introduced in the future.
  3. Cache: The performance could be similar or even better than ForStStateBackend if all cache hits (after supporting Async IO and MultiGet), see FLIP-429 for more details.
  4. Remote Compaction: Local Compation may cause a waste of resources. We could leverage remote compaction[5] to improve this, see FLIP-430 for more details.

Public Interfaces

ForStStateBackendFactory

ForStStateBackendFactory.java
/** A factory that creates an {@link ForStStateBackend} from a configuration. */
@PublicEvolving
public class ForStStateBackendFactory
        implements StateBackendFactory<ForStStateBackend> {
            
    @Override
    public ForStStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader)
            throws IllegalConfigurationException {
        return new ForStStateBackend().configure(config, classLoader);
    }
}

ForStStateBackend

ForStStateBackend.java
/** A StateBackend that uses ForSt to store remote state. */
@PublicEvolving
public class ForStStateBackend extends AbstractManagedMemoryStateBackend implements ConfigurableStateBackend {

    /**
     * Creates a copy of this state backend that uses the values defined in the configuration for
     * fields where that were not yet specified in this state backend.
     *
     * @param config The configuration.
     * @param classLoader The class loader.
     * @return The re-configured variant of the state backend
     */
    @Override
    public ForStStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
        return new ForStStateBackend(this, config, classLoader);
    }

    // Implement other methods of AbstractManagedMemoryStateBackend
}

ForStOptions

ForStOptions.java
/** Configuration options for the ForSt backend. */
@PublicEvolving
public class ForStOptions {

    public static final ConfigOption<String> FORST_WORKING_DIR =
            ConfigOptions.key("state.backend.forSt.working-dir")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("The base working directory of forSt state backend. Support all file system schemas which flink uses, e.g. hdfs, s3, oss. The libraries of related file systems must be included. It will be generated automatically in 'Job-checkpointing-dir/taskowned' if no configured");

}

Example

After configuring state.backend and state.backend.forSt.working-dir, The job will use ForStStateBackend as the new State Backend. All working files will be stored on the remote file systems.

flink-conf.yaml
// Configure the new State Backend
state.backend.type: org.apache.flink.state.forSt.ForStStateBackendFactory

// Configure the working directory of remote state, e.g. {hdfs/s3/oss}://remote-state-dir
state.backend.forSt.working-dir: hdfs://remote-state-dir

Limitations

  1. Only support FileSystems which Flink supports.
  2. The file system of working is same as the one used by checkpoint.

Rejected Alternatives

Rejected Proposal 1: Implement all filesystems using their C++ library

Rejected Reason: Could be seen in "Supports All FileSystems which Flink supports"

Compatibility, Deprecation, and Migration Plan

In theory, it could be compatible with RocksDBStateBackend even if we use different restoring and checkpointing strategy.

Test Plan

Tests contains:

  1. Existing UTs/ITs of ForStStateBackend
  2. New UT about FlinkFileSystem in new ForSt
  3. New UT about ForStFileSystem in Flink

Reference

[1] https://github.com/facebook/rocksdb/blob/main/include/rocksdb/file_system.h

[2] https://docs.oracle.com/javase/8/docs/technotes/guides/jni/spec/invocation.html#JNJI_OnLoad

[3] https://docs.oracle.com/javase/8/docs/technotes/guides/jni/spec/invocation.html#AttachCurrentThread

[4] https://rocksdb.org/blog/2022/10/07/asynchronous-io-in-rocksdb.html

[5] https://github.com/facebook/rocksdb/wiki/Remote-Compaction-(Experimental)

[6] https://github.com/facebook/rocksdb/blob/main/include/rocksdb/env.h


  • No labels