Motivation

Following the implementation of FLIP-301, the introduction of Hybrid Shuffle with Remote Storage capabilities has represented a major milestone in cloud-native environments. This development allows for storing the shuffle data to remote storage systems, significantly reducing the risk of insufficient space failures for the large-scale jobs.

The hybrid shuffle architecture provides a flexible and adaptive framework that seamlessly transitions between memory, disk and remote storage. This is achieved through the introduced tiered storage model, which dynamically allocates the most suitable storage tier for shuffle data. When resources are abundant, the system leverages high-performance storage to maximize processing efficiency. Conversely, in resource-constrained scenarios, it efficiently flushes data to disk to ensure stability.

Furthermore, each tier within this architecture is designed to be independent, allowing for exceptional scalability and ease of integration when additional storage tiers are required.

In parallel, Apache Celeborn, a shuffle data service aimed at enhancing the performance, stability, and adaptability of Big Data computation engines, has achieved significant advancements. These include enhancements in performance optimization, in-memory storage capabilities, robust security features, and overall system stability. However, the current integrating approach with Flink is cumbersome and does not fully capitalize on the benefits offered by Hybrid Shuffle.

To fully harness the potential of both systems, we propose integrating Flink's Hybrid Shuffle with Apache Celeborn through a pluggable remote tier interface. By integrating them together, we aim to make shuffle process more efficient, more stable, higher performance. By incorporating Celeborn as an additional storage tier within the tiered storage framework, we can create a shuffle system that is not only optimized for handling large-scale jobs but also more resilient, adaptable, and performant.

The local tier, which includes memory and disk storage, is not currently designed as a pluggable component. This approach was taken to keep the interface minimal, as there was no identified need for customization. Moreover, releasing a public interface too early can lead to issues with stability, security, and maintainability, as it might change before they've been fully verified. However, we are open to the possibility of future requirements that might call for a more flexible local tier. If such needs arise, we are also ready to implement a pluggable architecture for both the memory and disk tiers.

The main change on Flink and Celeborn in the FLIP is as follows.

Flink

Celeborn

  • Provide an option to specify the class name for the remote storage tier.

  • Allow the framework to initialize the pluggable remote storage tier.


  • Implement a new tier factory and tier for Celeborn.

  • Support granular data management at the Segment level for both client and server sides.


Public Interfaces

There are no changes on public interfaces.

Proposed Changes

In this document, our primary focus is on the change in Celeborn side. While the proposed change on Flink side is in FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn.

Currently, we are proposing to adopt the Map Partition mode for integration, as the Reduce Partition mode is still in the design phase. This integration approach will not interfere with the future implementation of the Reduce Partition mode.

Furthermore, to ensure compatibility, while integrating Celeborn with the hybrid shuffle is a more mature method, the existing shuffle service integration will remain unaffected at this stage. Once the integration is complete and the tier interface has stabilized, the existing shuffle service can then seamlessly transition to the new mode.

The key proposed changes are outlined below:

  1. Celeborn Remote Tier Factory: We will introduce a remote tier factory and its relative components( CelebornProducerAgent , CelebornConsumerAgent , CelebornMasterAgent , etc) within the Celeborn project.
  2. Writing/Reading Data With Segment GranularityIn FLIP 301, the tiered storage introduced Segment to write/read data for switching between tiersCeleborn will also adopt segment granularity for data operations.

    1. Client side
      1. The Celeborn producer agent will be capable of writing data with segment granularity, and the Celeborn consumer agent will be able to read data with segment granularity.

    2. Server side

      1. On the worker server side, data will be stored using Segment granularity as well.

  3. Compatibility with Existing Celeborn Features: The newly implemented tier must be compatible with the existing Celeborn features, such as partition splitting, identity authentication, etc.

The Celeborn tier will be integrated into the existing tiered storage architecture as below:

This is the overview of the Celeborn tier implementation. The details is in the following figure.

The Celeborn tier seamlessly integrates into the current tiered storage architecture without necessitating any modifications to the existing tiers. Note that only the green sections are the new implementations, while the rest represents the existing tiered storage infrastructure.

On the worker server side, the file format and data management are similar to the current implementation.  The key difference is that the server side must also store Segment information for each subpartition. This Segment information is crucial, as it ensures that the correct Segment data is sent only when the specific Segment is requested by the consumer agent, maintaining the data order across different tiers. This section in FLIP 301 has more details about how to switch between different tiers. 


The proposed integration introduces a new approach for Celeborn to operate with Flink, without impacting the existing Shuffle service mode. The new integration is designed to be compatible with the features of the traditional mode. 

  • Partition split. he new mode supports partition splitting by initiating a stream from the subsequent partition once the current one is fully consumed. This mechanism mirrors the logic employed in the previous shuffle service mode.
  • Region start or finish. The integration also facilitates operations within Regions. Given that SubPartition IDs within a Region are sequential, the new mode can detect and initiate a new Region when a lower SubPartition ID is encountered.
  • Compression support. The new mode also support data compression, enhancing the efficiency of data transportation and the processes of data writing and reading.
  • Credit based flow control. To ensure stability during the read process, the integration also supports credit-based flow control. This feature regulates data flow to maintain system balance and prevent overload.

While the list highlights some of the integration's prominent features, it is important to note that other functionalities, e.g., the worker blacklist mechanism, disk status detection, worker graceful shutdown, master HA, etc, continue to function as intended, unaffected by the new integration, but we didn't list all of them to keep the doc simple and easy to understand.

Implementation Plan

  1. New TierFactory Implementation. Develop a new TierFactory class within the Celeborn project.

  2. Write and Read Classes from both Client side and Server side. Implement the necessary classes for handling the data operations within the Celeborn tier. 

    1. Client side: 

      1. Create a CelebornProducerAgent to write data in the producer side. 

      2. Create a CelebornConsumerAgent to read data in the consumer side. 

      3. Create a CelebornMasterAgent to manage the partitions in the master side.

      4. The shuffle descriptor should implement the TierShuffleDescriptor for passing partition information between the writing/reading sides.

    2. Server side: 

      1. Implement a new SegmentMapPartitionFileWriter by extending MapPartitionFileWriter for writing file data and recording the Semgent info from each subpartitions. 

      2. Implement a new SegmentMapDataPartitionReader by extending MapPartitionReader for loading data from files and sending Segment data only when requested by the client.

      3. Introduce a new SegmentMapDataPartitionto manage these partition readers.


Compatibility, Deprecation, and Migration Plan

 This is a new feature, no compatibility, deprecation, and migration plan.


Test Plan

The changes will be covered by unit tests, e2e tests and manual tests.












  • No labels