Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know whole story.
Motivation
As described in FLIP-423, there are some tough issues about embedded state backend on local file system, respecially when dealing with extremely large state:
- Constraints of local disk space complicate the prediction of storage requirements, potentially leading to job failures: Especially in cloud native deployment mode, pre-allocated local disks typically face strict capacity constraints, making it challenging to forecast the size requirements of job states. Over-provisioning disk space results in unnecessary resource overhead, while under-provisioning risks job failure due to insufficient space.
- The tight coupling of compute and storage resources leads to underutilization and increased waste: Jobs can generally be categorized as either CPU-intensive or IO-intensive. In a coupled architecture, CPU-intensive jobs leave a significant portion of storage resources underutilized, whereas IO-intensive jobs result in idle computing resources.
By considering remote storage as the primary storage, all working states are maintained on the remote file system, which brings several advantages:
- Remote storages e.g. S3/HDFS typically offer elastic scalability, theoretically providing unlimited space.
- The allocation of remote storage resources can be optimized by reducing them for CPU-intensive jobs and augmenting them for IO-intensive jobs, thus enhancing overall resource utilization.
- This architecture facilitates a highly efficient and lightweight process for checkpointing, recovery, and rescaling through fast copy or simple move.
This FLIP aims to realize disaggregated state for our new key-value store named ForSt which evloves from RocksDB and supports remote file system. This makes Flink get rid of the disadvantages by coupled state architecture and embrace the scalable as well as flexible cloud-native storage.
Proposed Changes
As discussed in FLIP-423, RocksDB(frocksdb) is selected to support disaggregated storage architecture because it has been validated by extensive production practices with Flink jobs and it also boasts significant scalability.
We plan to extend RocksDB to run on all file systems supported by Flink, thereby allowing state to be stored on various distributed file systems. The new proposed store is named ForSt, which stands for 'For-Streaming Storage'. However, currently RocksDB only supports local file systems, so additional efforts are required to support this:
- Remote File Systems Intergration in ForSt side. We will leverage JNI bridge to proxy any I/O operations from ForSt to Flink FileSystem interface, where many file systems have already been supported.
- Implement new State Backend in Flink side. We will introduce a new state backend called ForStStateBackend to support ForSt.
ForSt Side : Remote File Systems Intergration
ForSt is built on top of RocksDB. While RocksDB supports pluggable filesystem[1] which helps users to build RocksDB on different filesystems conveniently, ForSt should integrate remote filesystems . There are two proposals implementing this as we could see in the above graph:
- Proposal 1: Implement every file system based on ForSt's filesystem interface.
- Proposal 2: Leverage JNI bridge to proxy any I/O operations from ForSt to Flink FileSystem interface, where many file systems have already been supported.
Compared Proposal 1 to Proposal 2 | |
Performace (Proposal 1 is better) | Proposal 2 has additional JNI overhead, but the overhead is relatively negligible when weighed against the latency of remote I/O. |
Achievability (Proposal 2 is better) | Due to the lack of a unified abstraction for different file systems, Proposal 1 requires implementation in C++ for each file system, which constitutes a considerable amount of work. While for Proposal 2, the JNI bridge shoud be established once between ForSt's interfaces and Flink's. It is a one-time job that works for every file systems that Flink has already supported. |
Maintainability (Proposal 2 is better) | For Proposal 1, all different versions and dependencies related to the C++ file system libraries must be independently maintained and kept consistent with Flink. For Proposal 2, the critical logic of JNI bridge is consistent. The detailed implementations are maintained in Flink, which is much more easier for further developing. |
Scalability (Proposal 2 is better) | For Proposal 2, the implementation in ForSt universal, all filesystems supported in Flink can be supported easily in java. |
As compared in the table, we'd like to choose proposal 2 firstly to support commonly used filesystems which Flink supports, e.g. hdfs, s3 and oss by using JNI to bridge the gap between ForSt FileSystem and Flink FileSystem.
Of course, proposal 1 could be considered as an optimization to improve the performance in the future. This could be implemented in ForSt side, then Flink side only need to change used env[6] to support it.
We will introduce some key points of the intergration for proposal 2 in the following subsections.
FS Interface bridge
As outlined above, this FLIP employs a JNI bridge to redirect all I/O operations from ForSt to the Flink FileSystem interface. The table below enumerates the interfaces from ForSt and specifies the corresponding Flink FileSystem interfaces that the implementation should use.
Interface Reference | ||
Interface of ForSt | Which interface of Flink should be used | Comment |
Constructor | get |
|
FileExists | exists | Could be implemented using the method of Flink simply |
CreateDir | mkdirs | |
CreateDirIfMissing | mkdirs | |
DeleteDir | delete | |
DeleteFile | delete | |
GetChildren | listStatus | |
RenameFile | rename | |
GetFileSize | getFileStatus | |
GetFileModificationTime | getFileStatus | |
IsDirectory | getFileStatus | |
NewDirectory -> FSDirectory | getFileStatus | |
LockFile/UnlockFile | - | Fine if unsupported: Lock for different DB processors, Flink doesn't rely on this. |
NewSequentialFile -> FSSequentialFile | open -> FSDataInputStream |
|
NewRandomAccessFile -> FSRandomAccessFile | ||
NewWritableFile -> FSWritableFile | create -> FSDataOutputStream | |
NewLogger -> Logger | - | LOG in FileSystem,Could be implemented by NewWritableFile |
JNI Related
JNI is a part of the Java platform that allows Java code to interact with code written in other languages, typically C or C++. JNIEnv is the key component of JNI through which native methods can interact with the JVM, providing rich methods e.g. creating new Java Objects, calling methods, handling exceptions. To interact with JVM in C++ side, three steps should be followed:
Step | The core issue to be resolved | Solution |
Step 1: Init and maintain JNIEnv | The JNIEnv each call used should be consistent with the one used by the JVM of TaskManager | Global JNIEnv |
Step 2: Use JNIEnv to create Java Objects and call corresponding methods | current methods about reading/ writing files of Flink FileSystem are not enough due to unavoidable data copy between C++ memory and Java heap as we could see in the above table. | Reading/Writing files with ByteBuffer |
Step 3: Maintain the creation and release of these objects | Since the ForSt is the one that calls the interfaces, the objects' lifecycle should be managed in DB side. | Maintain the lifecycle of these objects |
The details of the three solutions are as follows:
- Global JNIEnv. The JNIEnv could be maintained globally by implementing JNI_Onload[2] and AttachCurrentThread[3]. Then All JNI call will use JNIEnv same as the JVM of TaskManager.
- Reading/Writing files with ByteBuffer. We need to introduce a ForStFileSystem in Flink to support reading and writing data by ByteBuffer whose pseudocode could be seen in "Implement new FileSystem". Then ForSt could allocate ByteBuffer and use it to call new methods of this class. The lifecycle of ByteBuffer is maintained by ForSt as described below.
- Maintain the lifecycle of these objects. The lifecycle of all objects from Flink FileSystem, e.g. FileSystem, FSDataOutputStream, FSDataInputStream should be maintained in ForSt. As we could see in the below graph, The lifecycle of these objects:
- ForSt creates some objects by methods of FileSystem in Flink side when ForSt accesses its FileSystem. these objects are saved as a member variable in ForSt side.
- ForSt will use these objects to executes some FileSystem operations including reading and writing files at runtime.
- These objects are destructed when these classes are released.
Other behaviors in ForSt
The other parts of ForSt built on the file system, such as reading, writing and compation, all those parts are similar as local. Some stratiges could be introduced to improve which will be described in "Future Avaliable Improvement".
Flink Side: Implementation Details
As discussed in FLIP-423, we will introduce a new state backend called ForStStateBackend to support the ForSt.
Basically, many classes could be extracted and reused (see "Extract Common Classes"). Remained classes containing the logic of Initialization, Configuration, Checkpointing, Restoring should be re-implemeted e.g. ForStStateBackendFactory / ForStStateBackend / ForStKeyedStateBackend:
- Initialization: The directory should be initialized in the remote storage
- Configuration: some extra configurations should be parsed and some unsupported configurations should be checked
- Checkpoint & Restoring & Rescaling: The strategy of checkpointing & restoring & rescaling is different from ForStStateBackend due to existing files in remote storage which will be described in "Basic Strategy of Checkpointing & Restoring & Rescaling"
Besides, As we discussed in "Methods of reading/writing files", we also need to implement a ForStFileSystem to support reading and writing with ByteBuffer which will be introduced in "Implement ForStFileSystem".
Extract Common Classes
Many classes are common for RocksDBStateBackend and ForStStateBackend. For example :
- related to basic configuration. e.g. RocksDBOptions, PredefinedOptions
- related to resource control. e.g. RocksDBResourceContainer, RocksDBSharedResources
- related to state interface. e.g. RocksDBValueState, RocksDBMapState
So we'd like to extract a common Module called "flink-statebackend-rocksdb-common" and put these common classes into the new module.
Implement new FileSystem
A new class called ForStFileSystem which wrappers FileSystem will be introduced to be responsible for:
- Supporting reading/writing data from/to remote storage with ByteBuffer
- Introducing some metrics in filesystem level to monitor reading and writing
- Introducing cache strategies in filesystem level
Its pseudocode could be seen as below.
/** * ForStFileSystem is a class used to wrapper current flink FileSystem * and expose some interfaces to the ForSt . */ public class ForStFileSystem extends FileSystem { // original FileSystem private final FileSystem flinkFS; public static FileSystem get(URI uri) throws IOException { // Wrapper original FileSystem#get(URI uri) } // Wrapper other methods in FileSystem @Override public ByteBufferReadableFSDataInputStream open(Path f, int bufferSize) throws IOException { // Call the original FileSystem#open FSDataInputStream original = flinkFS.open(f, bufferSize); long fileSize = flinkFS.getFileStatus(f).getLen(); // Wrapper with ByteBufferReadableFSDataInputStream return new ByteBufferReadableFSDataInputStream(f, original, () -> flinkFS.open(f, bufferSize), fileSize); } @Override public ByteBufferWritableFSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException { // Call the original FileSystem#create FSDataOutputStream original = flinkFS.create(f, overwriteMode); // Wrapper with ByteBufferWritableFSDataOutputStream return new ByteBufferWritableFSDataOutputStream(f, original); }
public class ByteBufferReadableFSDataInputStream extends FSDataInputStream { public ByteBufferReadableFSDataInputStream( Path path, FSDataInputStream fsdis, long totalFileSize) { // init } public int readFully(long position, ByteBuffer bb) throws IOException { // Read data from remote storage with ByteBuffer which will be called in the ForSt } // Wrapper other methods in FSDataInputStream }
public class ByteBufferWritableFSDataOutputStream extends FSDataOutputStream { public ByteBufferWritableFSDataOutputStream(Path path, FSDataOutputStream fsdos) { // init } public void write(ByteBuffer bb) throws IOException { // Write data to remote storage with ByteBuffer which will be called in the ForSt } // Wrapper other methods in FSDataOutputStream }
ForSt will init ForStFileSystem
and call its open
and create
methods by JNI, then use int readFully(long position, ByteBuffer bb)
and void write(ByteBuffer bb)
to read or write data.
Local File Cache
Always reading and writing files from remote storage can lead to a significant increase in latency. Therefore, we introduced local file cache to speed up file I/O operations.
We could leverage existing LocalFileSystem
and introduce a FIFO File Cache mechanism to make full use of local disk space, prioritizing retrieval from the LocalFileSystem
. Only when local disk space is insufficient, ForStFileSystem
will be accessed.
Directory layout
Working files are the live files actively utilized by ForSt during its runtime operations which are located within the remote file system.
The directory layout of working files is shown as below. It's maintained in the directory of "Job-checkpointing-dir/taskowned" by default, whose directory could be modified by the new option which is described in "Public Interface Changes".
The life cycle (including creating and cleanup) of working dir is managed as before local strategy.
There maybe orphaned files in some corner cases since StateBackend just try its best to clean up its working files during StateBackend#dispose, we could leave the optimization in the future FLIP such as FLIP-432.
// Default layout by default |--- Job-checkpointing-dir |--- chk-xxx |--- shared |--- taskowned | --- working file dir |--- subTask-sub-dir |--- db | --- working files // User-defined directory |--- User-defined-RemoteState-working-dir |--- subTask-sub-dir |--- db | --- working files
Checkpointing & Recovery
The checkpointing and recovery involve copying files from one location to another. For the very basic version of ForStStateBackend, it will do downloading and re-uploading to copy files within remote file systems. This is pretty much like the behavior of RocksDBStateBackend except that the hard-link is unavailable for DFS.
The FLIP-428 proposes a more lightweight checkpointing and recovery, which leverages file reusing and fast-copy mechanism. Please refer to FLIP-428 for more details.
Other Related Improvement
There are many available improvement will be described in other FLIPs:
- Async IO: Async IO could help a lot especially for IO-intensive jobs, see FLIP-424 & FLIP-425 for more details.
- Batch MultiGet: RocksDB multiGet[4] could reduce IO requests greatly, which will be introduced in the future.
- Cache: The performance could be similar or even better than ForStStateBackend if all cache hits (after supporting Async IO and MultiGet), see FLIP-429 for more details.
- Remote Compaction: Local Compation may cause a waste of resources. We could leverage remote compaction[5] to improve this, see FLIP-430 for more details.
Public Interfaces
ForStStateBackendFactory
/** A factory that creates an {@link ForStStateBackend} from a configuration. */ @PublicEvolving public class ForStStateBackendFactory implements StateBackendFactory<ForStStateBackend> { @Override public ForStStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException { return new ForStStateBackend().configure(config, classLoader); } }
ForStStateBackend
/** A StateBackend that uses ForSt to store remote state. */ @PublicEvolving public class ForStStateBackend extends AbstractManagedMemoryStateBackend implements ConfigurableStateBackend { /** * Creates a copy of this state backend that uses the values defined in the configuration for * fields where that were not yet specified in this state backend. * * @param config The configuration. * @param classLoader The class loader. * @return The re-configured variant of the state backend */ @Override public ForStStateBackend configure(ReadableConfig config, ClassLoader classLoader) { return new ForStStateBackend(this, config, classLoader); } // Implement other methods of AbstractManagedMemoryStateBackend }
ForStOptions
/** Configuration options for the ForSt backend. */ @PublicEvolving public class ForStOptions { public static final ConfigOption<String> FORST_WORKING_DIR = ConfigOptions.key("state.backend.forSt.working-dir") .stringType() .noDefaultValue() .withDescription("The base working directory of forSt state backend. Support all file system schemas which flink uses, e.g. hdfs, s3, oss. The libraries of related file systems must be included. It will be generated automatically in 'Job-checkpointing-dir/taskowned' if no configured"); }
Example
After configuring state.backend
and state.backend.forSt.working-dir
, The job will use ForStStateBackend
as the new State Backend. All working files will be stored on the remote file systems.
// Configure the new State Backend state.backend.type: org.apache.flink.state.forSt.ForStStateBackendFactory // Configure the working directory of remote state, e.g. {hdfs/s3/oss}://remote-state-dir state.backend.forSt.working-dir: hdfs://remote-state-dir
Limitations
- Only support FileSystems which Flink supports.
- The file system of working is same as the one used by checkpoint.
Rejected Alternatives
Rejected Proposal 1: Implement all filesystems using their C++ library
Rejected Reason: Could be seen in "Supports All FileSystems which Flink supports"
Compatibility, Deprecation, and Migration Plan
In theory, it could be compatible with RocksDBStateBackend even if we use different restoring and checkpointing strategy.
Test Plan
Tests contains:
- Existing UTs/ITs of ForStStateBackend
- New UT about FlinkFileSystem in new ForSt
- New UT about ForStFileSystem in Flink
Reference
[1] https://github.com/facebook/rocksdb/blob/main/include/rocksdb/file_system.h
[2] https://docs.oracle.com/javase/8/docs/technotes/guides/jni/spec/invocation.html#JNJI_OnLoad
[3] https://docs.oracle.com/javase/8/docs/technotes/guides/jni/spec/invocation.html#AttachCurrentThread
[4] https://rocksdb.org/blog/2022/10/07/asynchronous-io-in-rocksdb.html
[5] https://github.com/facebook/rocksdb/wiki/Remote-Compaction-(Experimental)
[6] https://github.com/facebook/rocksdb/blob/main/include/rocksdb/env.h