DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Discussion thread | https://lists.apache.org/thread/bl4shgcxyysqgyk7gq3h6pyd93l4rqc0 |
|---|---|
| Vote thread | here (<- link to https://lists.apache.org/list.html?dev@flink.apache.org) |
| JIRA | here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX) |
| Release | <Flink Version> |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know the whole story.
Motivation
Flink state backends have long adopted LSM-Trees for state storage. A critical component of LSM-Trees is compaction, which merges immutable files to remove obsolete data, thereby reducing storage overhead and improving read performance.
Currently in Flink, compaction executes locally on TaskManagers within background threads (Local Compaction). This co-location inevitably leads to recurring contention for CPU and I/O resources between foreground tasks and background compaction operations. This can be further intensified by periodic checkpointing, resulting in latency spikes, unstable resource usage, and overprovisioning.
Flink 2.0 introduced disaggregated state management via ForSt StateBackend (detailed in FLIP-423), facilitating the decoupling of the CPU-intensive compaction process from Flink compute nodes. Since a shared DFS now serves as the primary state storage, state files for compaction are directly accessible without disturbing normal processing. This enables ForSt to introduce compaction-as-a-service (Remote Compaction), where dedicated stateless compactor workers execute file compactions triggered by Flink tasks. Currently the implementation of ForSt already supports remote compaction, but it is still necessary for us to establish the interaction protocol between Flink subtasks and the compaction service to operationalize this capability.
This design significantly enhances the flexibility of the setup, as the compactor and compute nodes can scale independently, effectively decoupling the two resource types to handle their respective workloads. Given the I/O-intensive nature of compaction, the compactor can be deployed either within the same intranet as the DFS cluster or even locally on nodes hosting file data, regardless of the network configuration of Flink TMs. As a shared service, resource spikes from compactions across different jobs can be staggered, leading to more stable CPU usage and a higher overall resource utilization rate. This optimization aligns with the pooling concept prevalent in the cloud-native era.
Therefore, this FLIP proposes to introduce Remote Compaction for the ForSt StateBackend. This feature can further complement Flink's disaggregated design by cleanly separating computation and storage management responsibilities.
Status Quo of ForSt's Compaction
ForSt is the key-value store for Flink's disaggregated state management, which is derived from RocksDB (FrocksDB) and utilizes remote file systems as primary storage. While ForSt currently supports offloading compactions to remote hosts [1], a defined interaction protocol with the compaction service is required to operationalize this feature.
The figure below illustrates both compaction modes (local and remote) in ForSt. Typically, to perform a compaction, ForSt picks some SST files from the LSM tree and then:
- For Local Compaction: ForSt retrieves the input files from remote storage, executes compaction in local background threads, and then updates LSM-tree metadata with new files.
- For Remote Compaction:
- ForSt sends compaction request containing input file metadata to service
- Service accesses files from remote storage and performs compaction
- Service returns new file metadata to ForSt
- ForSt updates LSM-tree by adding new files and removing obsolete files
ForSt also supports mode switching between local and remote compaction, as detailed in the workflow diagram below.
Implementation Requirements for Flink Integration
- ForSt Client Implementation: As shown in the above figure, ForSt exposes two abstract functions which are derived from RocksDB. We need to implement these functions (The function signatures are detailed below).
- 'Make a Compaction Request': Serializes metadata of files scheduled for compaction and send it to a compaction service
- 'Wait for Response': The callback for receiving a response from the compaction service. We should parse the resulting file metadata from compaction responses.
- Compaction Service Implementation: We also need to implement the compaction service. For the core logic of compaction, ForSt has implemented a method called
OpenAndCompact, which loads the input files and executes compaction. The service will invoke this method for actual compaction work.
Public Interfaces
By default, Flink jobs run with local compaction. To enable remote compaction, you only need to specify the address of the compaction service endpoint in the job configuration as follows:
- state.backend.forst.remote-compaction.address: Comma-separated list of
ip:portentries for service endpoints. Omission implies remote compaction is disabled.
Such a job will direct its compaction requests to one of the specified compaction service endpoints. Dynamic binding of endpoints is also supported. That means the Service endpoints can be added to or removed from a running Flink job through REST API:
URI: /jobs/:jobid/forst/remote-compaction-services Method: POST Query Parameters: add: Comma-separated list of service endpoints to be added remove: Comma-separated list of service endpoints to be removed
Some extra configuration options will also be added to ForSt StateBackend:
- state.backend.forst.remote-compaction.request.timeout: Timeout for a compaction response. This refers to the end-to-end duration for compaction requests, spanning from request submission through remote compaction execution to final response delivery.
- state.backend.forst.remote-compaction.request.retries: Max retry attempts on request failure.
- state.backend.forst.remote-compaction.request.failure-strategy: Action to take when a compaction request fails and the retries are exhausted. Valid options are:
- "Fallback" (fallback to local compaction)
- "Fail" (fail the subtask)
Compaction Service
ForSt's design decouples its internals from service implementation, allowing users to deploy custom compaction services that meet the "Requirements for Flink Integration" above.
In this FLIP, we propose a brief design for the remote compaction service, which delivers scalable and reliable service and an out-of-the-box deployment option via Flink's CLI. The figure below illustrates the service components and Flink interactions:
- Gateway: RPC endpoint for Flink communication
- Scheduler: Distributes requests from gateway to workers for load balancing
- Compaction Worker Pool: Executes the actual compaction operations
For this process to function properly, the compaction service must be accessible to both Flink subtasks and the DFS to receive and handle compaction requests. Additionally, it must exhibit elasticity and reliability, requiring the scheduler to dynamically track the liveness and scaling events of the worker pool. This stateless architecture inherently supports lightweight scaling operations, as compaction requests are processed independently with no dependency on prior operations.
To deploy a Compaction Service, one can execute the following Flink command:
flink-console.sh forst-compaction-service
Upon successful startup, the command output will include the address of the Gateway, which serves as the endpoint for the compaction service.
Proposed Changes
RPC Components
The following figure illustrates the compaction service using a Client/Server model, focusing on the communication of different components.
All components of the compaction service communicate via RPCs. This can be implemented using the existing RPC framework of Flink with Pekko. Specifically, we need to implement RpcEndpoint for each following component:
class CompactionServiceClient extends RPCEndpoint {
/**
* Receive the response for a compaction request.
* @param compactionJobId ID of the compaction job.
* @param compactionOutputs Serialized bytes of the compaction outputs, such as the names of newly created files.
* @return A future acknowledge if the communication succeeded.
*/
CompletableFuture<Acknowledge> receiveCompactionResponse(CompactionJobID compactionJobId, byte[] compactionOutputs);
}
class CompactionServiceGateway extends RPCEndpoint {
/**
* Submit a compaction request to the gateway of the compaciton service.
* @param compactionInputs Serialized bytes of the compaction inputs, such as the names of the files to be compacted.
* @return A future containing the ID of the compaction job.
*/
CompletableFuture<CompactionJobID> submitCompactionRequest(byte[] compactionInputs);
/**
* Abort a on-going compaction request.
* @param compactionJobId ID of the compaction job.
* @return A future acknowledge if the communication succeeded.
*/
CompletableFuture<Acknowledge> abortCompactionRequest(CompactionJobID compactionJobId);
}
class CompactionServiceWorker extends RPCEndpoint {
/**
* Invoke compaction on a worker.
* @param compactionJobId ID of the compaction job.
* @param compactionInputs Serialized bytes of the compaction inputs, such as the names of the files to be compacted.
* @return A future containing the serialized bytes of the compaction outputs.
*/
CompletableFuture<byte[]> performCompaction(CompactionJobID compactionJobId, byte[] compactionInputs);
}
Gateway and Scheduler
On the server side, the gateway receives compaction requests and asks the scheduler to distribute the compaction jobs to the compaction workers. Ensuring load balancing among the workers is crucial for achieving good performance and high resource utilization. For baseline implementation, we can use a round-robin scheduling policy to assign tasks to the compactors. Additionally, we can monitor the workload of each worker by counting the number of input files remaining in their compaction requests. This allows us to always assign compaction tasks to the least occupied worker.
Client and Worker
ForSt already provides the necessary interfaces for the compaction service on the client side. By implementing these interfaces, we can define how a ForSt instance submits a compaction request and how it waits for the response. Additionally, we need to implement the logic for the compaction workers to specify how they perform compaction on the input files.
/* Status of a remote compaction request. */
enum class CompactionServiceJobStatus : char {
kSuccess, // The compaction finishes successfully.
kFailure, // The compaction finishes with failures.
kUseLocal, // The compaction fails and should fall back to a local compaction.
};
class ForStCompactionService : public CompactionService {
/**
* [Client-side] Send a remote compaction request to the compaction service.
* @param info Information about the compaction, such as the ID of this compaction job.
* @param compaction_input Names of the files to be compacted.
* @return Status of the submission.
*/
CompactionServiceJobStatus StartV2(const CompactionServiceJobInfo& info,
const std::string& compaction_input) override;
/**
* [Client-side] Wait for remote compaction to finish.
* @param info Information about the compaction.
* @param compaction_service_result Serialized results of the compaction, including file names of the compaction output.
* @return Status of the compaction.
*/
CompactionServiceJobStatus WaitForCompleteV2(
const CompactionServiceJobInfo& info,
std::string* compaction_service_result) override;
/**
* [Compaction Worker] Perform the actual compaction job.
* @param info Information about the compaction.
* @param input Names of the input files.
* @param base_path Base path of the ForSt DB that requests this compaction
* @param forst_file_system Java object of the ForStFlinkFileSystem. This is the proxy for accessing files on the DFS.
* @return Serialized results of the compaction
*/
static std::string triggerCompaction(const CompactionServiceJobInfo& info,
const std::string& input,
const std::string& base_path,
jobject forst_file_system) {
// 1. Open a DB solely composed of the input files
// 2. Trigger a full compaction in the DB
// 3. Return the compaction outputs
}
}
Failure Handling and Fallback Policies
After triggering a remote compaction request, a Flink subtask awaits a successful response. In unsuccessful cases, it may encounter timeouts or error responses instead:
- Timeout: Occurs when the request cannot be processed due to: (1) Gateway unavailability (2) Failure or overload of the assigned compaction worker. In these scenarios, the behavior of the Flink subtask is configurable. Typically, it will retry the request several times. If retries are exhausted and the request still fails, the subtask can either:
- Fall Back: Perform local compaction instead.
- Fail: Triggering standard Flink failure recovery mechanisms.
- Error Response: Returned if the worker determines the request cannot be processed (e.g., inability to access input files from the DFS or encountering unexpected file content). Such cases typically indicate an unexpected state of the ForSt StateBackend. Therefore, an exception must be thrown, mirroring the behavior of a failed local compaction.
To further improve efficiency, the compaction service can enable clients to proactively poll request status while awaiting responses. This allows a waiting subtask to:
- Quickly detect worker failure and immediately cease waiting, triggering a new request to the gateway.
- Determine if the request is queued or in-progress, informing the decision about whether to continue waiting for a successful response.
POC and Test results
We have implemented a POC of the compaction service, which can be found in the following branches:
- Flink: https://github.com/AlexYinHan/flink/tree/remote_compaction_feature
- ForSt: https://github.com/AlexYinHan/ForSt/tree/remote_compaction_feature
The README.md file within the Flink branch provides comprehensive instructions for enabling this feature, including a runnable example, and offers access to a pre-built Docker image for convenient setup and evaluation.
To evaluate the performance impact of Remote Compaction, we conducted tests using a real-world job with a total state size of approximately 290 GB. The checkpoint interval was set to 1 minute, with incremental checkpoints enabled. We collected CPU usage metrics over a one-hour job execution for the following configurations:
- Flink 1.20
- Flink 2.0 (ForSt StateBackend, Disaggregated State Management)
- Without Remote Compaction
- With Remote Compaction (compaction service was deployed in an isolated Kubernetes pod)
The figure below illustrates the CPU usage profiles of all Flink TMs. For Flink 2.0 with Remote Compaction, the chart includes two distinct lines:
- Total (including Compaction Service): Represents the total CPU usage (Flink TMs plus the dedicated compaction service).
- TMs of User Job: Represents the CPU usage only of the TaskManagers running the subtasks of the real-world job.
As a result, Flink 1.20 shows significant and periodic peaks in CPU usage, aligned with the checkpoint intervals. As for Flink 2.0 baseline (without Remote Compaction), TM CPU peaks are reduced compared to Flink 1.20, primarily due to the elimination of file copies during checkpointing enabled by disaggregated state management. However, noticeable CPU fluctuations persist, caused by constantly triggered background compactions running locally on the TMs. In contrast, with remote compaction, CPU usage shows significantly reduced variance and is unaffected by checkpoint operations, as compaction tasks are offloaded to dedicated compaction nodes. As for the compaction nodes, their CPU usage is upper-bounded by the resource limits configured for them. This effectively stabilizes the overall cluster CPU usage as long as the compaction service can keep pace with the compaction demands.
These results demonstrate the effectiveness of remote compaction in stabilizing overall resource consumption and completing Flink's disaggregated architecture through further decoupling of computation from storage management.
Compatibility, Deprecation, and Migration Plan
The proposed changes take effect only when the feature is explicitly enabled. Since the feature does not alter the data structures stored in the state backend, it incurs no violation to checkpoint compatibility. No modifications to existing interfaces are required, and no existing functionality is deprecated.
Implementation Plan
The FLIP will be implemented by the following steps:
- (In ForSt) Implement the client-side abstract functions of compaction requests and worker-side execution logic within ForSt.
- Implement the RPC components for the compaction service.
- Implement the basic logic for gateway and scheduler on Flink side.
- Introduce new command-line interfaces and configuration options for starting and configuring remote compaction.
- Enhance the scheduler with optimized, load-aware task distribution strategies.
- Implement automatic detection and handling of compaction worker additions/removals (scale events and failures).
Test Plan
- UTs for each newly introduced classes and interfaces
- ITs of stateful tasks with remote compaction
- Performance tests will also be done before the feature release.
Rejected Alternatives
None for now.
Reference
[1] https://github.com/facebook/rocksdb/wiki/Remote-Compaction-(Experimental)




