Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
Vote threadhttps://lists.apache.org/thread/gt7kv6v562y2smc9gjt4kh39kp666894
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-34984

Release2.0    2.1

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Checkpointing is a routine operation for Flink Engine to ensure fault tolerance and enable rescaling. Besides that, end-to-end exactly-once hinges on committing data in a granularity of each checkpoint, a fundamental requirement for streaming DB. Hence making light and fast checkpoints is crucial. While generic log-based checkpoinitingcheckpointing[1] tackles the problem, it introduces an additional log layer to double-write state changes and incurs extra CPU and network overheads.

...

FLIP-425 introduces a new component named "Asynchronous Execution Controller(AEC)" to address these challenges and orchestrate the asynchronous execution. A new set of asynchronous state APIs is also introduced accordingly within this asynchronous paradigm. Please refer to FLIP-425 for a deeper dive.

Optimized buffer checkpointing for Asynchronous Execution Model (FLIP-

...

455)

The asynchronous execution model introduced by FLIP-425 parallelize the record processing but also enlarge the size of elements to drain during sync phase of checkpointing. Thus we consider a set of declarative APIs and a `declareProcess()` function that users should implement in some newly introduced AbstractStreamOperator, we could get the declaration of record processing in runtime, broken down to requests and callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem of lambda (de)serialization and instead we retrieve callbacks every time before a task runs. The next step is to provide an API allowing users to assign an unique id to each state request and callback, or automatically assign one by declaration order. Thus we can find the corresponding callback in runtime for each restored state request based on the id, then the whole pipeline can be resumed.

...

While several research projects explore disaggregated, embedded key-value stores (like those referenced in disaggregated RocksDB[12] and RocksDB-Cloud[23]), no widely adopted, open-source solutions exist yet. By carefully weighing usability, extensibility, complexity, and performance as well as the efforts to integrate with the Flink engine, we decided to build a disaggregated state store named ForSt on top of frocksdb. Additionally, we created a unified file system JNI proxy that leverages existing file system implementations in Flink ensuring compatibility with various file system options.

...

  • Version: based onFlink 1.19
  • Deployment mode: yarn per-Job
  • Flink yarn cluster:
    • 1 master 2 workers
    • specifications:
      • master: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
      • worker: ecs.i2g.2xlarge 8 vCPU 32 GiB  (Alibaba Cloud)
  • HDFS cluster:
    • 1 master 2 workers
    • specifications:
      • master: ecs.g7.2xlarge 8 vCPU 32 GiB (Alibaba Cloud)
      • worker: ecs.i2g.2xlarge 8 vCPU 32 GiB  (Alibaba Cloud)
    • yarn cluster and HDFS cluster are on the same LAN
  • State backend: ForSt (Based on RocksDB 8.5.3)
  • Job config:
    • Memory: Task Manager 4GB3GB, Job Manager 4GB1600MB
    • Checkpointcheckpoint: Disabled (Not fully supported in PoC yet)

...

  1. Heavy Checkpointing Procedure: A considerable amount of files need to be uploaded during checkpointing.
  2. Limited Data Structure Flexibility: Confining local disk data to the SST format restricts potential performance gains from alternative caching structures.
  3. Inaccurate Warm/Cold Distinction: File-level classification of data as warm or cold inaccurately reflects actual access patterns, leading to suboptimal resource allocation.
  4. More Complicated File Management: This architecture indicates that both local disk and DFS play part of the primary storage, hence needs to unify the file management of the local disk and DFS, which is complicated in extreme cases of error handling e.t.c. 

References

[1] Generic log-based checkpointing https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/

[2] Disaggregated RocksDB: https://dl.acm.org/doi/pdf/10.1145/3589772

[23] RocksDB-cloud/Rockset https://github.com/rockset/rocksdb-cloud

...