Motivation

Following the implementation of FLIP-301, the introduction of Hybrid Shuffle with Remote Storage capabilities has represented a major milestone in cloud-native environments. This development allows for storing the shuffle data to remote storage systems, significantly reducing the risk of insufficient space failures for the large-scale jobs.

The hybrid shuffle architecture provides a flexible and adaptive framework that seamlessly transitions between memory, disk and remote storage. This is achieved through the introduced tiered storage model, which dynamically allocates the most suitable storage tier for shuffle data. When resources are abundant, the system leverages high-performance storage to maximize processing efficiency. Conversely, in resource-constrained scenarios, it efficiently flushes data to disk to ensure stability.

Furthermore, each tier within this architecture is designed to be independent, allowing for exceptional scalability and ease of integration when additional storage tiers are required.

In parallel, Apache Celeborn, a shuffle data service aimed at enhancing the performance, stability, and adaptability of Big Data computation engines, has achieved significant advancements. These include enhancements in performance optimization, in-memory storage capabilities, robust security features, and overall system stability. However, the current integrating approach with Flink is cumbersome and does not fully capitalize on the benefits offered by Hybrid Shuffle.

To fully harness the potential of both systems, we propose integrating Flink's Hybrid Shuffle with Apache Celeborn through a pluggable remote tier interface. By integrating them together, we aim to make shuffle process more efficient, more stable, higher performance. By incorporating Celeborn as an additional storage tier within the tiered storage framework, we can create a shuffle system that is not only optimized for handling large-scale jobs but also more resilient, adaptable, and performant.

The local tier, which includes memory and disk storage, is not currently designed as a pluggable component. This approach was taken to keep the interface minimal, as there was no identified need for customization. Moreover, releasing a public interface too early can lead to issues with stability, security, and maintainability, as it might change before they've been fully verified. However, we are open to the possibility of future requirements that might call for a more flexible local tier. If such needs arise, we are also ready to implement a pluggable architecture for both the memory and disk tiers.

The main change on Flink and Celeborn in the FLIP is as follows.

Flink

Celeborn

  • Provide an option to specify the class name for the remote storage tier.

  • Allow the framework to initialize the pluggable remote storage tier.


  • Implement a new tier factory and tier for Celeborn.

  • Support granular data management at the Segment level for both client and server sides.


Public Interfaces

  1. The config option is to specify the class name of the remote tier.
    1. Key
      1. taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class
    2. Default
      1. No default value
    3. Description
      1. The option configures the class that is responsible for creating an external remote tier factory for hybrid shuffle. If configured, the hybrid shuffle will only initialize the specified remote tier according to the given class name. Currently, since the tier interfaces are not yet public and are still actively evolving, it is recommended that users do not independently implement the external remote tier until the tier interfaces are stabilized. 

In the long term, it is more beneficial for Celeborn to rely on the Flink public API to achieve decoupling between the two projects. However, we propose not to expose the tier interface as a public API at this stage for two primary reasons:

  1. Currently, Celeborn is also reliant on certain internal Flink implementation classes (such as NetworkbufferPool, SingleInputGate, ResultPartitionWriter, etc.), which means that the tier interfaces cannot be decoupled from these classes at this time.
  2. The tier interface is currently unstable. Being in its early stages of development, it is still evolving towards greater stability. Premature exposure of the interface could be harmful to user experience and long-term maintenance.

Once stabilized, Celeborn will transition to rely on this standardized API, streamlining development and reducing the need for version-specific plugins, promoting a more integrated and sustainable develop between Flink and Celeborn.

Proposed Changes

In this FLIP, our primary focus is on the aspect of pluggability in Flink side, While the proposed change on Celeborn side is in CIP-6 Support Flink hybrid shuffle integration with Apache Celeborn.

We are proposing several key enhancements to support a pluggable remote tier, detailed as follows:

  1. Pluggable Remote Tier: We will enable the remote tier to be pluggable, allowing it to be configured through the specified class name.
  2. Remote Tier Factory Creation: Remote tier factory is established based on the configured remote tier.

Pluggable Remote Tier

The configured remote tier class name should implement `org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory` interface. It is important to ensure that the external implementation includes not only the TierFactory  class but also the associated components such as TierMasterAgent , TierProducerAgent , TierConsumerAgent , and TierShuffleDescriptor as required. These classes are responsible for shuffle writing, shuffle reading, and data management.

Remote Tier Factory Creation

We should reevaluate the creation strategy of remote tier factory. After FLIP-301, the remote tier supports remote tier storage by configuring a remote storage path. In this FLIP, the remote tier can be configured with a class name.

In cases where both configuration methods are specified, the remote tier class name is given higher priority. This decision is based on our belief that external shuffle implementations will likely offer more tailored optimizations for writing and reading shuffle data, aligning better with user-specific requirements.


Implementation Plan

  1. Pluggable Remote Tier. Make the remote tier pluggable and configurable. 
  2. Implement Remote Tier Creation Process. According to the previous strategies, create remote tier dynamically according to the configured class name or the configured remote storage path.


Compatibility, Deprecation, and Migration Plan

This is a new feature, no compatibility, deprecation, and migration plan.


Test Plan

The changes will be covered by unit tests, e2e tests, and manual tests.