Discussion thread | https://lists.apache.org/thread/lrxjfpjjwlq4sjzm1oolx58n1n8r48hw |
---|---|
Vote thread | https://lists.apache.org/thread/r92qoxkt1kwtkbx9p45cpx4jto7s3l0d |
JIRA | |
Release | 2.0 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP proposes upgrading the version of FRocksDB in the Flink Project from 6.20.3 to 8.10.0.
RocksDBStateBackend is widely used by Flink users in large state scenarios.The last upgrade of FRocksDB was in version Flink-1.14, which mainly supported features such as support arm platform, deleteRange API, period compaction, etc. It has been a long time since then, and RocksDB has now been released to version 8.x. The main motivation for this upgrade is to leverage the features of higher versions of Rocksdb to make Flink RocksDBStateBackend more powerful. While RocksDB is also continuously optimizing and bug fixing, we hope to keep FRocksDB more or less in sync with RocksDB and upgrade it periodically.
(Although this upgrade almost doesn't change anything in flink code, the main reason for creating this FLIP is because the feature and performance changes it brings may be visible to users)
Main benefits
This section will introduce the main benefits of upgrading RocksDB, including optimizing the performance of Rocksdb rescaling using IngestDB API and other potential optimization points that have not yet been implemented, such as async io and tiered storage etc.
IngestDB
The problem of slow RocksDBStateBackend rescaling has been troubling users for a long time. Such as when the parallelism of a job changes from 2 to 1. When RocksDB performs rescaling recovery, it is necessary to merge the data of two RocksDB into one DB instance.
In the current implementation, merging data from two RocksDB can only traverse all records of one DB and then insert them into to the other one. To address this issue, we extended RocksDB API to directly merge data from multiple RocksDB instances and support IngestDB Recovery Mode - FLINK-31238Getting issue details... STATUS in RocksDBStateBackend. In this way, the time for merging DB data has been increased from minutes or even half an hour to a few seconds.
Now the APIs required by IngestDB have been merged into the main branch of RockDB - FLINK-33337Getting issue details... STATUS , and the implementation of IngestDB has also already been completed https://github.com/apache/flink/pull/24031.(special thanks to Stefan Richter )
However, the feature of IngestDB still cannot be used because FRocksDB has not been upgraded to the corresponding version yet.
We tested the performance of ingestDB with a snapshot size of 1g. The results are as follows:
- In scenarios where parallelism increases(1->2, 2->8), the performance of using IngestDB Mode is the same as before, because during the recovery phase, only deleteRange will be called, which will return very quickly.
- In other scenarios, IngestDB has shown significant improvements compared to previous recovery methods.
CurrentMode | IngestDBMode | |||
Restore Details | Max Restore Time | Restore Details | Max Restore Time | |
1->2 | task0: 809ms task1: 738ms | 809ms | task0: 789ms task1: 745ms | 789ms |
2->1 | task0: 42979ms | 42979ms | task0: 890ms | 890ms |
2->8 | task0 : 677ms task1 : 566ms task2 : 580ms task3 : 610ms task4 : 534ms task5 : 439ms task6 : 436ms task7 : 481ms | 677ms | task0 : 496ms task1 : 474ms task2 : 408ms task3 : 415ms task4 : 456ms task5 : 428ms task6 : 433ms task7 : 407ms | 496ms |
8->2 | task0 : 33603ms task1 : 42867ms | 42867ms | task0 : 680ms task1 : 650ms | 680ms |
2->3 | task0 : 421ms task1 : 15299ms task2 : 474ms | 15299ms | task0 : 499ms task1 : 952ms task2 : 428ms | 952ms |
3->2 | task0 : 14833ms task1 : 14810ms | 14833ms | task0 : 738ms task1 : 669ms | 738ms |
Other Potential Optimization Points
In addition to IngestDB, upgrading Rocksdb can also bring some other features that will be very useful for Flink. These features have not yet been used and implemented in Flink, so they are listed here as potential optimization points
AsyncIO & MultiGet
RocksDB supports MultiGet & Async IO in higher versions to improve the performance by over 100% according to different data pipelines which is also required by one of the FLIP-426 changes.Therefore, using Async IO can significantly improve the performance of state batch access and traversal, which are also widely used in Flink. Although further discussion may be needed on how to adapt async io in Flink RocksdbStateBackend, all of these require us to first upgrade FRocksDB as a prerequisite.
Tiered Storage
In FLIP-423, some limitations of current State management were mentioned, including Local Disk Constraints, Light and Fast Checkpoints, Spiky Resource Usage, Elasticity and Fast Rescaling.
RocksDB now supports Tiered Storage which can assign data temperature when creating the new SST which hints the file system to put the data on the corresponding storage media, so the data in a single DB instance can be placed on different storage media
This feature can also address some of the aforementioned issues. At the same time, RocksDB has also optimized the performance of Tiered Storage, including such as Per Key Placement Compaction and Secondary Cache.
Although FLIP-423 has designed a complete solution ForSt for these scenarios, perhaps RocksDBStateBackend can also directly reuse the existing features of RocksDB for some enhancements at a relatively low cost, and there is no conflict between these two optimization
BlobDB
In the usage scenarios of Flink, the state value may be very large in many scenarios, such as machine learning features or long-term user-defined states. In LSM, this large-value use case can cause a lot of read and write amplification, affecting the performance of RocksDB. Fortunately, RocksDB supports BlobDB to address this issue. The basic idea, which was proposed in the WiscKey paper, is a key value separation: by storing large values in dedicated blob files and storing only small pointers to them in the LSM tree, avoid copying the values over and over again during compaction. This reduces write amplification, which has several potential benefits like improved SSD lifetime, and better write and read performance. Perhaps BlobDB can also be supported in RocksDBStateBackend in future FLIP.
Public Interfaces
No changes
Proposed Changes
- Flink
- Update FRocksDB version to 8.10.0 after create official artifacts
- FRockDB
- In FrocksDB 6 thread-local perf-context is disabled by reverting a specific commit (FLINK-19710). However, this creates conflicts and makes upgrading more difficult. We found that disabling PERF_CONTEXT can improve the performance of statebenchmark by about 5% and it doesn't create any conflicts. So we plan to drop the support of PERF_CONTEXT in the new FRocksDB release.
Compatibility, Deprecation, and Migration Plan
this change doesn't break any existing formats or jobs
Test Plan
For version upgrades, performance testing is very important, so we upgraded RocksDB to version 8.10.0 on the Flink Master branch and compared it with the current version 6.20.3 on StateBenchmark and Nexmark, respectively. The test results are as follows:
Nexmark
We ran the Flink Nexmark benchmark on 4 independent machines. Each machine consists of an AMD EPYC 9Y24 CPU and NVME ssd disk. Flink's cluster consists of 1 JobManager and 8 TaskManagers (each with 1 slot and with 8 GB of memory). And we selected all queries that use state in Nexmark . From the test results, it appears that the performance of most Queries is similar to the previous throughput.
Nexmark Query | Rocksdb-6.20.3 | Rocksdb 8.10 | Compare | ||
Throughput(r/s) | Throughput(r/s) | ||||
q3 | 3.29M | 3.3M | 0.304% | ||
q4 | 445.24K | 423.51K | -4.881% | ||
q5 | 730.26K | 986.83K | 35.134% | ||
q7 | 292.4K | 302.88K | 3.584% | ||
q8 | 3.49M | 3.48M | -0.287% | ||
q9 | 252.69K | 228.57K | -9.545% | ||
q11 | 621.61K | 620.93K | -0.109% | ||
q12 | 3.17M | 3.15M | -0.631% | ||
q15 | 1.87M | 1.8M | -3.743% | ||
q16 | 544.57K | 603.11K | 10.750% | ||
q17 | 2.62M | 2.59M | -1.145% | ||
q18 | 828.87K | 841.21K | 1.489% | ||
q19 | 875.74K | 902.24K | 3.026% | ||
q20 | 333.23K | 318.04K | -4.558% | ||
Total | - | - | 2.099% |
StateBenchmark
We conducted State Benchmark testing on RocksDB in Flink Speed Center (special thanks to Zakelly Lan and Roman Khachatryan ), and ran three rounds of testing to reduce jitter. From the test results, it appears that the read performance of RocksDB V8 is not much different from V6, but there is a slight regression in write performance
Benchmark | V8-Round1 | V8-Round2 | V8-Round3 | V8-Average | V6-Round1 | V6-Round2 | V6-Round3 | V6-Average | V8 vs. V6 |
listAdd | 448.491818 | 453.333675 | 457.166387 | 452.997293 | 464.362323 | 473.023696 | 469.028608 | 468.804876 | -3.37% |
listAddAll | 263.26387 | 270.054733 | 262.261058 | 265.19322 | 284.309874 | 289.76554 | 288.108038 | 287.394484 | -7.73% |
listAppend | 442.991378 | 448.92388 | 442.740273 | 444.885177 | 456.645484 | 449.272529 | 472.059381 | 459.325798 | -3.14% |
listGet | 112.747402 | 114.132869 | 116.11602 | 114.332097 | 107.290669 | 110.100745 | 108.338023 | 108.576479 | 5.30% |
listGetAndIterate | 110.51841 | 114.477101 | 116.979456 | 113.991656 | 105.17187 | 108.271787 | 105.472538 | 106.305398 | 7.23% |
listUpdate | 446.459229 | 464.632036 | 451.062437 | 454.051234 | 459.690066 | 472.406343 | 471.146269 | 467.747559 | -2.93% |
mapAdd | 342.679319 | 328.865235 | 331.80544 | 334.449998 | 377.897566 | 395.306567 | 388.034114 | 387.079416 | -13.60% |
mapContains | 43.022308 | 41.730886 | 42.781915 | 42.511703 | 41.552857 | 42.433217 | 42.385542 | 42.123872 | 0.92% |
mapEntries | 307.232009 | 308.416811 | 312.635165 | 309.427995 | 296.845214 | 303.213684 | 301.87162 | 300.643506 | 2.92% |
mapGet | 43.60992 | 43.526943 | 44.934931 | 44.0239313 | 41.771907 | 41.955097 | 42.502606 | 42.0765367 | 4.63% |
mapIsEmpty | 38.459568 | 38.827621 | 38.863359 | 38.7168493 | 37.626888 | 38.69073 | 38.21728 | 38.1782993 | 1.41% |
mapIterator | 308.022383 | 313.170994 | 309.486335 | 310.226571 | 296.904146 | 304.785345 | 299.106992 | 300.265494 | 3.32% |
mapKeys | 312.194847 | 313.422331 | 312.004277 | 312.540485 | 298.532292 | 309.173538 | 299.409632 | 302.371821 | 3.36% |
mapPutAll | 117.615508 | 115.833148 | 116.925634 | 116.79143 | 111.550326 | 115.296096 | 113.288307 | 113.378243 | 3.01% |
mapRemove | 350.185313 | 356.262055 | 356.58919 | 354.345519 | 382.432421 | 398.891158 | 383.474288 | 388.265956 | -8.74% |
mapUpdate | 326.56323 | 343.909292 | 337.205441 | 335.892654 | 379.508134 | 377.136306 | 388.68193 | 381.775457 | -12.02% |
mapValues | 311.106842 | 311.309557 | 311.582689 | 311.333029 | 300.050187 | 295.121068 | 307.620506 | 300.930587 | 3.46% |
valueAdd | 336.48586 | 331.392677 | 343.310332 | 337.062956 | 382.798278 | 379.32724 | 392.866154 | 384.997224 | -12.45% |
valueGet | 554.671769 | 562.49161 | 546.083539 | 554.415639 | 563.677194 | 594.294875 | 596.363248 | 584.778439 | -5.19% |
valueUpdate | 325.541024 | 337.096711 | 353.488646 | 338.708794 | 383.990973 | 396.756718 | 396.228346 | 392.325346 | -13.67% |
Overall, We still think that compared to the huge benefits brought by the upgraded version, the slight regression in write performance is still acceptable since the write overhead is a very small part of the overall.
Rejected Alternatives
- Upgrade FRocksDB to 8.11.x or 9.X version
It has been some time since we started upgrading FRocksDB. At that time, the latest release version of RocksDB was 8.10.0, but now RocksDB has released several new versions. However, we still plan to upgrade to 8.10.0 in this upgrade. The main reason is that
When we upgrade, we need to do a lot of adaptation, testing, and bug fixing work, which requires a lot of time and effort. In later released versions of RocksDB, we did not see such attractive features that are worth repeating these job. In fact, we have also conducted quick benchmark validation based on the latest version 9.X, and its performance is similar to 8.10.0. So we believe there is no need to upgrade to other versions.