Discussion thread
Vote thread

FLINK-22910 - Getting issue details... STATUS


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The pluggable shuffle framework was Introduced to Flink by FLIP-31 and based on it we Implement our own remote shuffle service for Flink. During the implementation process, we found that the current interface incurs some limitations which can influence the usability of the customized shuffle plugin. More specifically, the following requirements need to be fulfilled:

  1. Lack of graceful initialization and finalization. The shuffle master of an external/remote shuffle service may hold some external resources which need to be initialized/released gracefully;
  2. Can not stop tracking lost partitions which may lead to repeated failover to reproduce the output data of the upstream tasks. Flink can already handle TM lost and stop tracking partitions on the lost TMs properly;
  3. Can not handle unrecoverable errors gracefully. The customized shuffle plugin may encounter some fatal errors which need to be propagated to Flink framework properly;
  4. The relationship of ShuffleMaster and job is unclear and depends on the implementation which is also inconsistent with the ShuffleEnvironment at the TM side which can be misleading.

There are also some other points which are out of the scope of this FLIP which can be optimized latter. Though this PLIP can not solving them, it is moving toward the right direction. Note that this FLIP is only the minimal changes needed and we can also add the following ones into this FLIP if needed.

  1. Can not support cluster partitions properly. There is no interface method to promote or stop tracking the cluster partitions stored externally;
  2. Can not support multiple shuffle plugin in one Flink cluster. As discussed in the mailing list, different jobs may need different shuffle implementations;
  3. The boundary between the interface and implementation is unclear. Both the interface and the implementation are in the runtime module which may lead to the misuse of classes which are not public.

Public Interfaces

ShuffleMaster: the lifecycle methods start, close, registerJob, unregisterJob are newly added, the signature of registerPartitionWithProducer is changed (adding a JobID). Other methods are kept unchanged.

public interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

* Starts this shuffle master as a service. One can do some initialization here, for example
* getting access and connecting to the external system.
default void start() throws Exception {}

* Closes this shuffle master service which should release all resources. A shuffle master will
* only be closed when the cluster is shut down.
default void close() throws Exception {}

* Registers the target job together with the corresponding {@link JobShuffleContext} to this
* shuffle master. Through the shuffle context, one can obtain some basic information like job
* ID, job configuration and all result partitions produced. Besides, by stopping tracking the
* lost result partition, one can remove and reproduce them.
* @param context the corresponding shuffle context of the target job.
default void registerJob(JobShuffleContext context) {}

* Unregisters the target job from this shuffle master, which means the corresponding job has
* reached a global termination state and all the allocated resources except for the cluster
* partitions can be cleared.
* @param jobID ID of the target job to be unregistered.
default void unregisterJob(JobID jobID) {}

CompletableFuture<T> registerPartitionWithProducer(
JobID jobID,
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor);

void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);

default MemorySize computeShuffleMemorySizeForTask(
TaskInputsOutputsDescriptor taskInputsOutputsDescriptor) {
return MemorySize.ZERO;

JobShuffleContext: it is a newly added interface which contains the job level context which can provide some basic capabilities and proxy to other components of the Flink cluster like JobMaster.

public interface JobShuffleContext {

/** Returns the corresponding {@link JobID}. */
JobID getJobID();

* Stops tracking the target result partitions, which means these partitions will be removed and
* will be reproduced if used afterwards.
CompletableFuture<?> stopTrackingPartitions(Collection<ResultPartitionID> partitionIDS);

ShuffleMasterContext: it is a newly added interface which contains the cluster context which can provide some basic capabilities and proxy to other components of the Flink cluster like FatalErrorHandler.

public interface ShuffleMasterContext {

/** Returns the cluster configuration. */
Configuration getConfiguration();

/** Handles the fatal error if any. */
void onFatalError(Throwable throwable);

ShuffleServiceFactory: the signature of createShuffleMaster is changed, instead of passing only the configuration, a ShuffleMasterContext instance is given which can offer more functionalities.

public interface ShuffleServiceFactory<
SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G extends IndexedInputGate> {

ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext shuffleMasterContext);

ShuffleEnvironment<P, G> createShuffleEnvironment(
ShuffleEnvironmentContext shuffleEnvironmentContext);

Proposed Changes

There are four main parts of the proposed changes:

  1. Add the ShuffleMaster to the JobMasterSharedService, this make the ShuffleMaster a cluster level service and multiple jobs can share one ShuffleMaster. It is similar to the ShuffleEnvironment;
  2. Provide JobID when applying resources through ShuffleMaster#registerPartitionWithProducer which let the ShuffleMaster know which job a result partition belongs to and the ShuffleMaster can release data by job;
  3. Introduce ShuffleMasterContext to ShuffleMaster and make the ShuffleMaster can propagate the unrecoverable errors to the Flink framework and leaves space to extend its capability latter;
  4. Add lifecycle methods start, close, registerJob, unregisterJob to the ShuffleMaster interface which manage the lifecycle of ShuffleMaster. 

Compatibility, Deprecation, and Migration Plan

This change to the ShuffleMaster#registerPartitionWithProducer and ShuffleServiceFactory#createShuffleMaster can break the compatibility. The customized shuffle plugin need to do some minor changes to use the new interface.

Test Plan

The proposed changes will be tested by a real Flink shuffle plugin based on it with test jobs like TPC-DS benchmark suit.

Rejected Alternatives

No rejected alternatives.