Discussion thread | https://lists.apache.org/thread/55mwmfsxwprzf5l80so9t2cpny82l4nx |
---|---|
Vote thread | https://lists.apache.org/thread/xjh8z2kszq0kwj5bdz2bh3b1sotv593p |
JIRA | |
Release | - |
Motivation
Following the implementation of FLIP-301, the introduction of Hybrid Shuffle with Remote Storage capabilities has represented a major milestone in cloud-native environments. This development allows for storing the shuffle data to remote storage systems, significantly reducing the risk of insufficient space failures for the large-scale jobs.
The hybrid shuffle architecture provides a flexible and adaptive framework that seamlessly transitions between memory, disk and remote storage. This is achieved through the introduced tiered storage model, which dynamically allocates the most suitable storage tier for shuffle data. When resources are abundant, the system leverages high-performance storage to maximize processing efficiency. Conversely, in resource-constrained scenarios, it efficiently flushes data to disk to ensure stability.
Furthermore, each tier within this architecture is designed to be independent, allowing for exceptional scalability and ease of integration when additional storage tiers are required.
In parallel, Apache Celeborn, a shuffle data service aimed at enhancing the performance, stability, and adaptability of Big Data computation engines, has achieved significant advancements. These include enhancements in performance optimization, in-memory storage capabilities, robust security features, and overall system stability. However, the current integrating approach with Flink is cumbersome and does not fully capitalize on the benefits offered by Hybrid Shuffle.
To fully harness the potential of both systems, we propose integrating Flink's Hybrid Shuffle with Apache Celeborn through a pluggable remote tier interface. By integrating them together, we aim to make shuffle process more efficient, more stable, higher performance. By incorporating Celeborn as an additional storage tier within the tiered storage framework, we can create a shuffle system that is not only optimized for handling large-scale jobs but also more resilient, adaptable, and performant.
The local tier, which includes memory and disk storage, is not currently designed as a pluggable component. This approach was taken to keep the interface minimal, as there was no identified need for customization. Moreover, releasing a public interface too early can lead to issues with stability, security, and maintainability, as it might change before they've been fully verified. However, we are open to the possibility of future requirements that might call for a more flexible local tier. If such needs arise, we are also ready to implement a pluggable architecture for both the memory and disk tiers.
The main change on Flink and Celeborn in the FLIP is as follows.
Flink | Celeborn |
|
|
Public Interfaces
There are no changes on public interfaces.
Proposed Changes
In this document, our primary focus is on the change in Celeborn side. While the proposed change on Flink side is in FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn.
Currently, we are proposing to adopt the Map Partition mode for integration, as the Reduce Partition mode is still in the design phase. This integration approach will not interfere with the future implementation of the Reduce Partition mode.
Furthermore, to ensure compatibility, while integrating Celeborn with the hybrid shuffle is a more mature method, the existing shuffle service integration will remain unaffected at this stage. Once the integration is complete and the tier interface has stabilized, the existing shuffle service can then seamlessly transition to the new mode.
The key proposed changes are outlined below:
- Celeborn Remote Tier Factory: We will introduce a remote tier factory and its relative components(
CelebornProducerAgent
,CelebornConsumerAgent
,CelebornMasterAgent
, etc) within the Celeborn project. Writing/Reading Data With Segment Granularity: In FLIP 301, the tiered storage introduced Segment to write/read data for switching between tiers. Celeborn will also adopt segment granularity for data operations.
- Client side
The Celeborn producer agent will be capable of writing data with segment granularity, and the Celeborn consumer agent will be able to read data with segment granularity.
Server side
On the worker server side, data will be stored using Segment granularity as well.
- Client side
Compatibility with Existing Celeborn Features: The newly implemented tier must be compatible with the existing Celeborn features, such as partition splitting, identity authentication, etc.
The Celeborn tier will be integrated into the existing tiered storage architecture as below:
This is the overview of the Celeborn tier implementation. The details is in the following figure.
The Celeborn tier seamlessly integrates into the current tiered storage architecture without necessitating any modifications to the existing tiers. Note that only the green sections are the new implementations, while the rest represents the existing tiered storage infrastructure.
On the worker server side, the file format and data management are similar to the current implementation. The key difference is that the server side must also store Segment information for each subpartition. This Segment information is crucial, as it ensures that the correct Segment data is sent only when the specific Segment is requested by the consumer agent, maintaining the data order across different tiers. This section in FLIP 301 has more details about how to switch between different tiers.
The proposed integration introduces a new approach for Celeborn to operate with Flink, without impacting the existing Shuffle service mode. The new integration is designed to be compatible with the features of the traditional mode.
- Partition split. he new mode supports partition splitting by initiating a stream from the subsequent partition once the current one is fully consumed. This mechanism mirrors the logic employed in the previous shuffle service mode.
- Region start or finish. The integration also facilitates operations within Regions. Given that SubPartition IDs within a Region are sequential, the new mode can detect and initiate a new Region when a lower SubPartition ID is encountered.
- Compression support. The new mode also support data compression, enhancing the efficiency of data transportation and the processes of data writing and reading.
- Credit based flow control. To ensure stability during the read process, the integration also supports credit-based flow control. This feature regulates data flow to maintain system balance and prevent overload.
While the list highlights some of the integration's prominent features, it is important to note that other functionalities, e.g., the worker blacklist mechanism, disk status detection, worker graceful shutdown, master HA, etc, continue to function as intended, unaffected by the new integration, but we didn't list all of them to keep the doc simple and easy to understand.
Implementation Plan
New TierFactory Implementation. Develop a new TierFactory class within the Celeborn project.
Write and Read Classes from both Client side and Server side. Implement the necessary classes for handling the data operations within the Celeborn tier.
Client side:
Create a
CelebornProducerAgent
to write data in the producer side.Create a
CelebornConsumerAgent
to read data in the consumer side.Create a
CelebornMasterAgent
to manage the partitions in the master side.The shuffle descriptor should implement the
TierShuffleDescriptor
for passing partition information between the writing/reading sides.
Server side:
Implement a new
SegmentMapPartitionFileWriter
by extendingMapPartitionFileWriter
for writing file data and recording the Semgent info from each subpartitions.Implement a new
SegmentMapDataPartitionReader
by extendingMapPartitionReader
for loading data from files and sending Segment data only when requested by the client.Introduce a new
SegmentMapDataPartition
to manage these partition readers.
Compatibility, Deprecation, and Migration Plan
This is a new feature, no compatibility, deprecation, and migration plan.
Test Plan
The changes will be covered by unit tests, e2e tests and manual tests.