by Yuan Mei Zakelly Lan Jinzhong Li Hangxiang Yu Yanfei Lei  Feng Wang

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

One major advantage of Flink is its efficient and easy-to-use state management mechanism. However, this mechanism has not evolved much since it was born and hasn't kept pace with the demands of the cloud-native era. In this FLIP, we revisit the current architecture of Flink state management model and propose an augmented alternative in a disaggregated fashion.

Motivation

The past decade has witnessed a dramatic shift in Flink's deployment mode, workload patterns, and hardware improvements. We've moved from the map-reduce era where workers are computation-storage tightly coupled nodes to a cloud-native world where containerized deployments on Kubernetes become standard. At the same time, hardware improvements, like the tenfold increase in network bandwidth (from mere hundreds of Mbps to 25Gbps, comparable to local SSD bps), unlock compelling options for state storage. Fast intranet transmission and low-cost object storage make cloud storage a feasible and cost-effective choice. Flink's workload scale has also dramatically transformed. A state size of several hundred megabytes, once considered large, pales in comparison to the multi-terabyte states commonly encountered in today's applications.

Given these significant changes in deployment, hardware, and workloads, the current state access model heavily reliant on local disks no longer aligns with Flink's future in the cloud-native era.

Challenges

We discuss key challenges faced in Flink state management today in this session, including those newly introduced by cloud-native demands, and those longstanding due to the limitations of the local storage model. We aim to resolve these challenges with the new design.

Local Disk Constraints

While containerization technology provides resource isolation and secure job operations, it comes at a cost: resource is pre-allocated. As a result, local disk space is fixed and limited and becomes a bottleneck for data-intensive jobs. In the current model, we do not have good solutions except scaling up the job if running out of disk space, which is very costly.

Spiky Resource Usage

The current state model triggers periodic CPU and network I/O bursts during checkpointing as shown in Figure 1. This is because checkpointing triggers rocksdb compaction (CPU spike) and state file uploads to DFS (network I/O spike). For large-state jobs, those spikes happen almost the same time across all tasks, leading to the instability of the entire cluster. This also results in data processing TPS hampered by performance-crippling spikes during checkpoints.


Figure 1: CPU usage of Flink Job, periodic spikes incur during checkpointing

Elasticity and Fast Rescaling

Achieving zero-downtime rescaling remains a challenge for Flink, particularly for those with large state sizes. The current model involves state redistribution, download and rebuild during rescaling, hindering even near-zero downtime goals. While FLINK-31238 reduces rebuild time by merging and clipping SST files directly, downloading large state files to local disks is still a bottleneck. This significantly extends rescaling durations and can even block downscaling attempts if the aggregate state size exceeds local disk capacity.

Light and Fast Checkpoints

Checkpointing is a routine operation for Flink Engine to ensure fault tolerance and enable rescaling. Besides that, end-to-end exactly-once hinges on committing data in a granularity of each checkpoint, a fundamental requirement for streaming DB. Hence making light and fast checkpoints is crucial. While generic log-based checkpointing[1] tackles the problem, it introduces an additional log layer to double-write state changes and incurs extra CPU and network overheads.

High-Level Overview and Design

Figure 2: The Disaggregated Model of State Management

In the current model, Flink operators read and write states through a component named State Backend as illustrated on the left-hand side in Figure 2. States are put on files on local disks if not fit in memory. As aforementioned, to recover after a failure or enable rescaling, states are flushed to disk and state files are dumped to a durable storage periodically to make checkpoints, and read back after recovery/rescaling. This model greatly relies on the capability of local disks, which leads to the challenges listed above.

The right-hand side of Figure 2 illustrates the proposed new model. In the new model, local disks are optional as a secondary cache, and the remote DFS (e.g., S3/OSS/HDFS/GFS...) serves as a primary storage for states. Therefore, the remote DFS becomes the source of truth. This is a fundamental change from relying on local disks to a complete computation and storage disaggregated model. States are streamed to DFS continuously; memory and local disks act as a cache. This disaggregated architecture embraces a cloud-native approach and unlocks several advantages:

  1. Local disk is no longer a constraint as DFS used as the primary storage.
  2. The checkpointing procedure is fast, completing within 10s regardless of state size. Continuous streaming of states to DFS significantly reduces the amount of data needed to write during the checkpointing procedure, leading to rapid completion.
  3. DB compaction is decoupled from the checkpointing procedure and not bound to a specific Task Manager anymore. This allows for independent scheduling of compaction tasks, enabling better load-balancing and eliminating disruptive resource bursts cluster-wide.
  4. As for recovery and rescaling, state files are instantly accessible after the State Backend is up, eliminating the need for time-consuming local disk downloads.

The disaggregated model also facilitates a clean architecture for state sharing and querying. Checkpoint files are feasible to be shared between operators to eliminate redundant storage. For state querying, file-level APIs are provided to enable straightforward querying that resembles normal tables without relying on backend details or state specifics. In this FLIP, we focus on the disaggregated model and how it integrates with the current Flink Engine. Sharing and querying will be explored in future FLIPs.

Hurdles for the Disaggregated State Model

Our initial Proof of Concept (PoC) results showed a significant performance drawback for the disaggregated model (state store using DFS as primary storage) compared to the current model (local disk storage). Without local disk caches, the disaggregated model achieved only 2-5% of the maximum TPS observed with local disks across various setups. Analysis excluding page cache effects reveals that the main bottleneck lies in the blocking of the main task thread during state access as shown in Figure 3, making Flink engine performance highly sensitive to state access latency.

Figure 3: Current state access model in task thread

State access operates at a per-record granularity, executed by the Flink task thread. These accesses are queued and processed sequentially within the mailbox executor. State writing is non-blocking, as writes are appended to a write buffer and asynchronously flushed to disk when full. However, state reading involves fetching data from the disk on cache misses, which causes the task thread to block and halt further processing until the data is retrieved. As illustrated below, HDFS has 20+ times higher latency than local disk (1.5ms Vs. 68us), which explains why TPS drops 95% when switching from local disk to HDFS.

Table 1: Access Latency across different storage medium

Proposed Changes

Unlocking the disaggregated model requires tackling three key engine challenges:

  1. Implementing a non-blocking execution model: This unleashes full CPU potential while waiting for I/O or network operations.
  2. Developing a disaggregated data store: This store must seamlessly handle both writing and reading from DFSs.
  3. Integrating with Flink Engine: The data store needs to be integrated with the Flink Engine and existing state management mechanism (checkpoint/recover/rescale).

Non-blocking Asynchronous Execution Model: Parallel I/O (FLIP-425)

In FLIP-425, we propose a non-blocking execution model enabling asynchronous state access. In this new model, asynchronous State APIs(FLIP-424) are introduced. These APIs register callbacks with the task thread mailbox executor and allow for efficient chained execution. However, asynchronous access presents several new challenges:

  1. Preserving Processing Order for Identical Keys: Asynchronous execution does not inherently guarantee processing order for input data. This isn't an issue for input with different keys, as Flink's distributed nature already allows for some order variation between them. However, ensuring records with identical keys are processed in their arrival order is crucial for data integrity.
  2. Checkpointing Synchronization: Asynchronous execution adds more complexities to the checkpoint synchronization phase. Before triggering state snapshots, both ongoing callback executions and buffered input meant to maintain processing order must be drained to ensure a consistent state capture.
  3. Watermark and Timer: Watermark calculation and Timer triggering implicitly rely on order among in-flight data, necessitating special handling to guarantee the intended semantics within the asynchronous context.

FLIP-425 introduces a new component named "Asynchronous Execution Controller(AEC)" to address these challenges and orchestrate the asynchronous execution. A new set of asynchronous state APIs is also introduced accordingly within this asynchronous paradigm. Please refer to FLIP-425 for a deeper dive.

Optimized buffer checkpointing for Asynchronous Execution Model (FLIP-455)

The asynchronous execution model introduced by FLIP-425 parallelize the record processing but also enlarge the size of elements to drain during sync phase of checkpointing. Thus we consider a set of declarative APIs and a `declareProcess()` function that users should implement in some newly introduced AbstractStreamOperator, we could get the declaration of record processing in runtime, broken down to requests and callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem of lambda (de)serialization and instead we retrieve callbacks every time before a task runs. The next step is to provide an API allowing users to assign an unique id to each state request and callback, or automatically assign one by declaration order. Thus we can find the corresponding callback in runtime for each restored state request based on the id, then the whole pipeline can be resumed.

More details will be added in following future FLIPs.

Batching for Network I/O: Beyond Parallel I/O (FLIP-426)

The asynchronous model introduced in FLIP-425 effectively helps boost overall throughput with parallel I/O operations. However, simply expanding I/O parallelism is not enough for remote storage. For network I/O, the substantial round-trip time of RPC calls far outweighs the individual I/O size's impact. Hence, how to efficiently merge I/O requests into single network I/O operations and fetching multiple keys within a single call plays an important role in reducing the overhead of individual interactions and leverages available bandwidth more effectively. This becomes critically important for large-state jobs, where frequent state data fetching can lead to bottlenecks.

FLIP-426 solves this problem by:

  1. Categorizing state operations: This allows for fine-grained management and optimization of different types of data access.
  2. Grouping I/Os together: Batching adjacent requests reduces individual RPC overhead and leverages network efficiency.
  3. Integrating with the "Asynchronous Execution Controller": Integrating with FLIP-425's orchestration mechanism for asynchronous execution.

Disaggregated State Store: ForSt (FLIP-427)

Figure 2 illustrates our adopted approach of using an embedded key-value (k-v) store to complement remote storage. We introduce ForSt, a disaggregated version of frocksdb (the default local disk state store for Flink) to achieve this purpose.

One alternative is using an independent k-v service like HBase. However, this approach introduces new dependencies on external services, which significantly complicates deployment, maintenance and management. Currently, Flink solely depends on Kubernetes (K8S) and Distributed File System (DFS) for high availability and checkpointing respectively. Ease of use and deployment is critical for open-source projects and we want to align with this philosophy.

While several research projects explore disaggregated, embedded key-value stores (like those referenced in disaggregated RocksDB[2] and RocksDB-Cloud[3]), no widely adopted, open-source solutions exist yet. By carefully weighing usability, extensibility, complexity, and performance as well as the efforts to integrate with the Flink engine, we decided to build a disaggregated state store named ForSt on top of frocksdb. Additionally, we created a unified file system JNI proxy that leverages existing file system implementations in Flink ensuring compatibility with various file system options.

Detailed design can be found in FLIP-427 and PoC implementation can be found at https://github.com/ververica/ForSt/tree/disagg-poc.

Faster Checkpoint/Restore/Rescale: Leverage Shared DFS (FLIP-428)

By moving primary state storage to the remote filesystem (DFS) and using local disks optionally for caching, ForSt offers a tiered storage system for Flink. This approach provides significant improvements for checkpointing, restoring, and rescaling by leveraging the fact that working state and checkpoint files share the same underlying file system.

  • Fast Checkpoint: Since most state files already reside in DFS, only small incremental updates need to be uploaded during checkpointing, drastically reducing network transfer time. In addition, both working state and checkpoints can reference the same underlying physical files, eliminating duplication. This saves storage space and further accelerates checkpoints.
  • Restore: With DFS plays as the primary storage, downloading large state files to local disks is avoided, significantly reducing restore time. Local disks (cache) can be gradually warmed up after the job starts, further optimizing performance.
  • Rescale: Rescaling leverages existing solutions like ClipDB/IngestDB to accelerate rebuilding the state store on DFS directly. Notice that since file downloads are eliminated, local disk constraints for downscaling are no longer an issue.

Figure 4: Checkpoint/Restore/Rescale Mechanism (Current Model on the left; Disaggregated Model on the right)

For more detailed information on sharing the underlying file system between checkpoints and working states, along with changes to checkpointing, restoring, and rescaling procedures, please refer to FLIP-428.

Tiered Storage: Local Disk as a Secondary Cache (FLIP-429)

While the asynchronous execution model (FLIP-425) and network I/O grouping (FLIP-426) have significantly enhanced performance, access latency in Table 1 reveal that direct access to remote storage (DFS) remains 95% slower than local disk on average. Therefore, efficiently utilizing available local disks is crucial to maximize overall performance. In this context, local disks act as an optional secondary cache.

We propose two distinct disk-based caching solutions for disaggregated state management:

File-based Cache: This approach is straightforward and structures the cache at the granular level of individual files. It replicates remote SST files onto local disks, enabling direct reading from local storage when the corresponding file resides therein, circumventing costly DFS round trip. While this method is commendable for its ease of implementation, its limited granularity may not be optimal for access patterns requiring finer differentiation.

Hybrid Block Cache: This solution organizes the cache on disk at the block level, mirroring the granularity of the in-memory block cache. This facilitates unified management for both memory (Tier 1 Cache) and local disk (Tier 2 Cache) as a single entity. By ensuring that each block resides exclusively on either disk or memory (unlike the file-based cache), this approach minimizes duplication and likely experiences higher cache hit rates.

Initial PoC tests indicate that the hybrid block cache can provide an 80% increase in performance compared to the file-based cache for I/O-intensive jobs, both utilizing an LRU eviction policy. However, the inherent simplicity of the file-based cache necessitates less CPU overhead for management, particularly when I/O does not constitute the bottleneck. Therefore, it is better suited for scenarios where the state size is significantly smaller than the local storage capacity.

To optimize CPU efficiency across diverse scenarios, we are actively exploring an "Adaptive Local Cache" capable of intelligently transitioning between the aforementioned caching solutions based on workload characteristics. As depicted in Figure 5, this solution aims to achieve optimal performance regardless of the prevailing conditions. Initial testing shows that with an adaptive local cache, we can achieve at least the same performance while states can fit into the local disk. More details will be revealed in future FLIP(s).

Figure 5: Adaptive Local Cache

Remote Compaction (FLIP-430)

Compaction is at the heart of LSM-based storage to enable competitive read performance and reduce space amplification, but it introduces bursts of high I/O and CPU usage. This poses a challenge in pre-allocated resource scenarios, potentially leading to underutilization or performance hiccups.

Remote Compaction addresses this issue by delegating compaction tasks to a separate work node from the LSM engine itself, as seen in RocksDB-cloud/Rockset [2]. This decouples the sporadic resource demands of compaction from the normal data process, leading to:

  • More efficient resource utilization: dedicated nodes with pre-allocated resources specifically handle compaction across tasks, eliminating underutilization and ensuring smooth data processing.
  • Improved performance: By isolating compaction, the main LSM engine experiences minimal performance disruptions.

The specific design details of the Remote Compaction architecture are still under development and will be finalized in future design documents.

Public Interface Changes

This section details the steps involved in enabling disaggregated state management from a user perspective and related public interface changes.

StateBackend: ForSt

We introduce a new state backend "ForSt" (named "For Stream") to manage disaggregated states. Existing state backends, "rocksdb" utilizing local disk and "heap" residing in memory, remain unaltered. Users can use them as before. To leverage the "ForSt" state backend, users need to configure the backend and additionally specify a working directory for states.

Configure to use ForSt
state.backend.type: forst
# specify another storage other than the checkpoint storage (optional).
state.backend.forst.working-dir: hdfs:///path-to-your-storage


Asynchronous State APIs (FLIP-424) 

As aforementioned, a new suite of asynchronous state APIs (FLIP-424) has been introduced to seamlessly integrate with the Asynchronous Execution Model (FLIP-425). Existing synchronous state APIs remain unaltered, ensuring a smooth transition for users. Importantly, as demonstrated in the PoC results within FLIP-425, mimicking synchronous behavior using the asynchronous model incurs negligible overhead (~3%). 

How to use these asynchronous state APIs?

Table API/SQL Users: Don't need to make changes to their scripts since the SQL operators hide the details for them. A new set of SQL operators based on the asynchronous APIs will be provided later.

For DataStream users: Need to write async operators based on async APIs. Here is an example of writing streaming word count (both in synchronous and asynchronous APIs) :

Such usage in the following example code in processElement/flatMap is an intermediate step. And as such it will be used for development and PoC purposes only. The long term solution will be discussed later.


public class WordCount {

    // Job definition
    public static void main(String[] args) throws Exception {
        // Some initialization code...

        // Source
        DataStream<String> text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");

        // Aggregate
        DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer())
                        .name("tokenizer")
                        .keyBy(value -> value.f0)
                        // User codes
                        .flatMap(new Counter())
                        .name("counter");

        // Sink
        counts.print().name("print-sink");
        
        env.execute("WordCount");
    }

    // Core processing user function
    public static final class Counter extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

        // Initialization omitted.
        private transient ValueState<Integer> wordCounter;
        
        // Synchronous state API usage
        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
            Integer val = wordCounter.value();
            int updated = (val == null ? 1 : val + 1);
            wordCounter.update(updated);
            out.collect(Tuple2.of(value.f0, updated)); 
        }

        // Asynchronous state API usage
        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
            AtomicInteger updated = new AtomicInteger(0);
            wordCounter.value().thenCompose(
                    val -> {
                        if (val == null) {
                            updated.set(1);
                        } else {
                            updated.set(val + 1);
                        }
                        return wordCounter.update(updated.get());
                    }
            ).thenAccept(
                    empty -> {
                        out.collect(Tuple2.of(value.f0, updated.get()));
                    }
            );
        }
        
    }
}


DataStream API users transitioning to asynchronous state APIs will find the code structure largely unchanged, with the primary differences lying in state initialization and the use of future-based asynchronous APIs for processing. For comprehensive details please refer to FLIP-424.

PoC Results


Environment Setup:

  • Version: based on Flink 1.19
  • Deployment mode: yarn per-Job
  • Flink yarn cluster:
    • 1 master 2 workers
    • specifications:
      • master: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
      • worker: ecs.i2g.2xlarge 8 vCPU 32 GiB  (Alibaba Cloud)
  • HDFS cluster:
    • 1 master 2 workers
    • specifications:
      • master: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
      • worker: ecs.i2g.2xlarge 8 vCPU 32 GiB  (Alibaba Cloud)
    • yarn cluster and HDFS cluster are on the same LAN
  • State backend: ForSt (Based on RocksDB 8.5.3)
  • Job config:
    • Memory: Task Manager 3GB, Job Manager 1600MB
    • checkpoint: Disabled (Not fully supported in PoC yet)
      • Stage 1: populate different data to make the state size reach a certain size.
      • Stage 2: read/write the data from the key space of Stage 1, make the state size stable within a certain range.
      • Max TPS results are measured in Stage 2.
      • Job parameter && configuration
    • Read/Write ratio: 1
    • RocksDB/ForSt block cache: 512 MB


Max TPS Results(With Asynchronous Execution FLIP-425; Without IO Grouping FLP-426)



Local File Cache

TPS

State Size

1

Synchronous Execution + Local disk

-

17.1 K

5.52 G

2

Synchronous Execution + HDFS *

-

0.85 K

1 G *

3

Asynchronous Execution + HDFS

0 GB (0)

7 K

5.8 G

4

♾️ GB (100%)

42.7 K

5.8 G

*: Performance of Synchronous API + HDFS is extremely poor, and it is hard to accumulate enough state size. The results are measured at smaller data size (1G instead of 5G)

Based on the PoC results, we can draw several key conclusions regarding the performance implications:

  • DFS as primary storage exhibits more than 95% TPS decrease: Utilizing DFS as the primary storage solution significantly hinders performance. Compared to the baseline (Synchronous API + Local Disk using RocksDB), disaggregated state storage without optimization exhibits a performance degradation exceeding 95% in terms of TPS.

  • Asynchronous Execution Model exhibits 2.5x TPS improvements: The introduction of asynchronous state APIs offers substantial performance improvements. In an extreme case with all data cached locally (line 4), this model achieves a 2.5x increase in TPS compared to the baseline.

  • HDFS with Asynchronous Execution achieves 40% of the baseline performance: As an initial proof-of-concept (PoC) result (line 3), asynchronous execution with HDFS without caching achieves 40% of the baseline performance. We anticipate further improvements through optimizations like I/O grouping (FLIP-426) and leveraging the local disk as a secondary cache (FLIP-429). These advancements aim to bring HDFS performance closer to that of the local disk setup.

The PoC running instructions are provided in the Appendix for those interested in exploring the setup further.

Road Map + Launching Plan

As discussed, the disaggregated state evolution requires adjustments across the entire Flink Engine stack. To ensure a smooth and successful transition, we've divided the work into several stages with well-defined milestones:

Milestone 1: Core Functionalities (Jun. 30th, 2024 - MVP Version):

This phase focuses on building the foundational functionalities for disaggregated state management, aiming to achieve an end-to-end baseline version by the end of the milestone. This includes the following changes:

  • Asynchronous State APIs (FLIP-424): Introduce new APIs for asynchronous state access.
  • Asynchronous Execution Model (FLIP-425): Implement a non-blocking execution model leveraging the asynchronous APIs introduced in FLIP-424.
  • Grouping Remote State Access (FLIP-426): Enable retrieval of remote state data in batches.
  • Disaggregated State Store (FLIP-427): Introduce the initial version of the ForSt disaggregated state store.
  • Fault Tolerance/Rescale Integration (FLIP-428): Integrate checkpointing mechanisms with the disaggregated state store to ensure fault tolerance and rescaling capabilities.
  • Optimized buffer checkpointing for Asynchronous Execution (FLIP-TBD): A more fine-grained buffer checkpointing strategy to optimize duration of sync phase. A set of declarative APIs will be introduced.

Milestone 2: Performance and Usability Improvements (Sept. 30th, 2024 – Flink 2.0 release, Beta Version):

This phase prioritizes performance optimizations and user experience enhancements, aiming to deliver a complete disaggregated state management system integrated with asynchronous SQL operators by the target date. This milestone introduces several key functionalities:

  • Local Disk Caching (FLIP-429): Leverage local disks as a secondary cache to improve performance.
  • TM State Ownership and Faster Checkpointing (FLIP-432): Enhance state management for Task Managers
  • Disaggregated State Metrics (FLIP-431): Introduce disaggregated-state-related metrics.
  • SQL Operator Compatibility: Enable SQL async operator integration.

Milestone 3: Production Readiness and Stability (Dec. 31st, 2024 – Flink 2.1 release, Prod-Ready):

Milestone 3 prioritizes stability and production readiness, aiming to deliver a reliable disaggregated state management system by the end of the year, coinciding with the Flink 2.1 release. Key areas of focus during this phase include:

  • Stability Improvements
  • DataStream User Migration/Integration Tooling
  • Remote Compaction (FLIP-430)


Notably, the disaggregated state backend can work with synchronous state access APIs (existing operators). However, this approach might lead to suboptimal performance due to blocking operations. To fully utilize the capability of disaggregated states, we strongly encourage users to use async state APIs.

SQL stateful operators will be reworked to provide async state access in Flink 2.0 to provide a complete end-to-end user experience.

DataStream users need to rewrite the stateful operators in a call-back way as demonstrated in the example. We will try to provide tools later to facilitate migration.

Rejected Alternatives

Tiered Storage: DFS as complimentary when the local disk is out of space. While initially appealing due to its intuitive nature and potential performance benefits within local capacity, the tiered storage solution with local disk overflow to remote storage ultimately proves unsuitable for disaggregated state management in Flink.

Here's why:

  1. Heavy Checkpointing Procedure: A considerable amount of files need to be uploaded during checkpointing.
  2. Limited Data Structure Flexibility: Confining local disk data to the SST format restricts potential performance gains from alternative caching structures.
  3. Inaccurate Warm/Cold Distinction: File-level classification of data as warm or cold inaccurately reflects actual access patterns, leading to suboptimal resource allocation.
  4. More Complicated File Management: This architecture indicates that both local disk and DFS play part of the primary storage, hence needs to unify the file management of the local disk and DFS, which is complicated in extreme cases of error handling e.t.c. 

References

[1] Generic log-based checkpointing https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/

[2] Disaggregated RocksDB: https://dl.acm.org/doi/pdf/10.1145/3589772

[3] RocksDB-cloud/Rockset https://github.com/rockset/rocksdb-cloud

Appendix: How to run the PoC

ForSt PoC Branch: https://github.com/ververica/ForSt/tree/disagg-poc

Flink PoC Branch: https://github.com/ververica/flink-poc/tree/disagg-poc-2

Run PoC by following below three steps:

Step 1: Compile, Package and Install ForSt (Linux support ONLY)

  • Download ForSt PoC branch.
  • Compile and Package ForSt in the root directory, then a jar will be generated in "java/target"

DEBUG_LEVEL=0 ROCKSDB_DISABLE_JEMALLOC=true PORTABLE=1 CXXFLAGS="-Wno-error=shadow -Wno-error-defaulted-function-deleted -Wno-unknown-warning-option -Wno-error=unused-parameter -Wno-error=unused-variable" make -j8 rocksdbjavastatic
  • Install ForSt jar to local
mvn install:install-file -Dfile=/path/to/your/library.jar -DgroupId=com.ververica -DartifactId=forstjni -Dversion=8.5.3-poc-SNAPSHOT -Dpackaging=jar

Step 2: Package Flink and WordCount Jar

  • Download Flink PoC branch
  • Package Flink and streaming job jar.
mvn clean package -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip=true -Denforcer.skip=true -Drat.skip=true -Djapicmp.skip=true
  • The Flink binary can be found at "flink-dist/target/flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT" and the streaming test jar exists in "flink-examples/flink-examples-streaming/flink-examples-streaming-1.19-SNAPSHOT-AsyncWordCount.jar".

Step 3: Run WordCount

  • Use Flink CLI command to submit a job by a jar.   Note: Place the right dependency jar in plugin/ directory for remote file system.


  • No labels