Discussion threadhttps://lists.apache.org/thread/wkogrk9tt2bznhnj6p0slltr09dhyho5
Vote thread

FLINK-10653 - Getting issue details... STATUS


Original Design Document: https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing


Shuffle is the process of data transfer between stages, which involves in writing outputs on producer side and reading inputs on consumer side. The shuffle architecture and behavior in Flink are unified for both streaming and batch jobs. It can be improved in two dimensions:

  • Lifecycle of TaskExecutor (TE)/Task: TE starts an internal shuffle environment for transporting partition data to consumer side. When task enters FINISHED state, its produced partition might not be fully consumed. Therefore TE container should not be freed until all the internal partitions consumed. It is obvious that there exists coupled implicit constraints among them, but has no specific mechanism for coordinating them work well.

  • Lifecycle of ResultPartition: Certain features, like fine-grained recovery and interactive programming, require flexible consumption of produced intermediate results: delayed consumption or multiple times. In these case, shuffle service user (JM or RM) should decide when to release the produced partitions and shuffle API should support this. More details in design proposal to extend this FLIP.

  • Extension of writer/reader: ResultPartition can only be written into local memory for streaming job and single persistent file per subpartition for batch job. It is difficult to extend partition writer and reader sides together based on current architecture. E.g. ResultPartition might be written in sort&merge way or to external storage. And partition might also be transported via external shuffle service on YARN, Kubernetes etc in order to release TE early.

Proposed Changes

We propose a pluggable ShuffleService architecture for managing partitions on JobMaster (JM) side and extending adaptive writer/reader on TE side.

(1) Shuffle Service Factory

public interface ShuffleServiceFactory<SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G extends InputGate> {

      ShuffleMaster<SD> createShuffleMaster(Configuration configuration);

           ShuffleEnvironment<P, G> createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext);


  • Shuffle service factory creates ShuffleMaster (JM side) and ShuffleEnvironment (TE side). Flink config could also contain specific shuffle configuration like port etc.

  • We could support cluster level config for the factory class name in the first version. Later we could further support job or edge level config by introducing predefined ShuffleType. Cluster config could contain all provided factory implementations for each supported ShuffleType or fallback to default for some types.

(2) Shuffle Master (JM side)

JM process creates ShuffleMaster from configured factory per cluster, and is thus responsible for its lifecycle. ShuffleMaster is a global manager for partitions which means decoupling partition’s lifecycle from task. So it could bring natural benefits for the following improvements.

  • Task failover: If the consumer task fails or TE crashes, JM tries to reuse producer’s partition. If partition is available for consuming, the producer task might not need to be restarted which narrows down the failover region to reduce failover cost.

  • Partition cleanup: When all the consumer tasks are done (in FINISHED state), the producer’s partition can be manually deregistered with the ShuffleMaster to cleanup resources. In case of external storage, partitions are at risk to linger after job/cluster failures. TTL mechanism is one option for handling this issue.

In the first version, we only focus on migrating current existing process based on new ShuffleMaster architecture. So we define the most basic necessary methods below, and the above mentioned improvements might be forwarded step by step in priority by extending more features in ShuffleMaster.

public interface ShuffleMaster<T extends ShuffleDescriptor> {

      CompletableFuture<T> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor);

      void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);


  • PartitionDescriptor and ProducerDescriptor are introduced for wrapping all abstract informations which JM can provide from job/execution graph, such as partition id, type, parallelism etc and producer location, execution id, address, data port etc. These parameters are derived from graph and execution mode, so they are rather general parameters and do not belong to particular shuffle implementation.

  • When producer execution is scheduled to deploy, ProducerDescriptor and PartitionDescriptor are created to register producer’s partition with ShuffleMaster. ShuffleMaster transforms the abstract descriptors into a specific ShuffleDescriptor which would also be cached for consumer vertex if the consumer is not deployed yet.

  • ShuffleDescriptor is then put into ResultPartitionDeploymentDescriptor for submitting producer task and as a known producer inside InputGateDeploymentDescriptor for submitting consumer task. It can contain specific partition config for ShuffleEnvironment on TE side to serve partition writer and reader.

  • Special UnknownShuffleDescriptor could be used in InputGateDeploymentDescriptor if producer is unknown during the deployment of consumer. JM can update it on consumer side by sending the specific ShuffleDescriptor in partition infos when producer is deployed.

(3) Shuffle Environment (TE side)

TE creates ShuffleEnvironment from configured shuffle factory per cluster, and is thus responsible for its lifecycle. Considering future ShuffleType config on job/edge level, TE could keep a registry of shuffle factories per ShuffleType.

If TE is responsible for transporting partition data to consumer side, the produced partitions occupy local TE resources. In this case TE can be released only when all internal tasks are in FINISHED state and all produced partitions occupying local resources are consumed and released. ShuffleEnvironment could provide the information about locally unreleased partitions to decide whether the producer TE can be released.

      public interface ShuffleEnvironment<P extends ResultPartitionWriter, G extends InputGate> extends AutoCloseable {

            int start(); // returns data port for shuffle data exchange connection

            Collection<P> createResultPartitionWriters(...);

            void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds);

            Collection<ResultPartitionID> getPartitionsOccupyingLocalResources();

            Collection<G> createInputGates(...);

            boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo);


  • ShuffleEnvironment is responsible for creating ResultPartitionWriters for producer task and creating InputGates for consumer task. Therefore this architecture can support extension of matching writer and reader sides together. It might be useful for current ResultPartitionWriter/InputGate interfaces to extend AutoClosable.

  • Similar to how it is implemented currently, the scheduler/EG in JM can decide whether and when to update partition info on consumer side. E.g. always for pipelined partitions and when task is finished for blocking partitions. The producer task can also send the notification to JM when something has been produced in pipelined partition, as now. The consumer’s ShuffleEnvironment provides the way of updating internal input gate for known partition infos.

  • ShuffleEnvironment should also consider the transport way between producer and consumer, e.g. via netty-based network as the current default way. So ShuffleEnvironment might substitute NetworkEnvrionment in TaskManagerServices.

Future Improvement

Current ResultPartitionWriter and InputGate both operate on buffer unit with serialized record data. Certain ShuffleEnvironment implementation might benefit from operating on serialized record or even raw record directly (e.g. partial sort merge partition data).

  • Abstract RecordWriter/RecordReader interfaces for handling serialized/raw record.

  • ShuffleEnvironment could be further refactored to return RecordWriter/RecordReader.

Public Interfaces

  • In the first version, class name which implements ShuffleServiceFactory interface is configured by shuffle-service-factory.class parameter in Flink cluster level.
  • In the second version, it might support job/edge level ShuffleType config for specific ShuffleServiceFactory implementation.

Compatibility, Deprecation, and Migration Plan

  • In the first version, the default Netty implementation of ShuffleServiceFactory is compatible with current existing behavior.

  • In the second or later version, we can extend other implementations like YarnShuffleServiceFactory/KubernetesShuffleServiceFactory to be configured based on cluster environment and user requirements.

Implementation Plan

All the mentioned related work could be done in at least two versions. The first version implements the most basic architecture so that the following versions strictly build upon it.

First MVP: Refactoring to Shuffle API (Flink 1.9)

  • Implement Producer- and PartitionDescriptor for covering necessary abstract info.

  • Implement ShuffleDecriptor generated from Producer- and PartitionDecriptor.

  • Define ShuffleMaster interface and create a simple implementation on JM side which relies on currently implemented NetworkEnvironment on TE side.

  • Define ShuffleServiceFactory interface for creating ShuffleMaster.

  • Introduce a Flink configuration option for ShuffleServiceFactory implementation. Default value for it could be NettyShuffleServiceFactory which serves as a feature flag at the moment to use current code paths.

  • Define ShuffleEnvironment interface and give a default implementation on TE side.

  • Reuse shuffle related components from NetworkEnvironment to implement default NettyShuffleEnvironment.

  • Add ShuffleEnvironment factory method to ShuffleServiceFactory interface.

  • Respect feature flag in Flink configuration option for ShuffleServiceFactory

  • move some general concerns outside and NetworkEnvironment to make it not shuffle specific (e.g. notifyPipelinedConsumers from ResultPartition etc)

Next steps

  • Support job/edge level config for ShuffleType.

  • Abstract RecordWriter/Reader interface for handing raw records.

  • Refactor ShuffleEnvironment interface for returning RecordWriter/Reader.

  • Adjust the processes in StreamInputProcessor and RecordWriter based on Writer/Reader interfaces.

  • Extend to Yarn/KubernetesShuffleServiceFactory implementations based on new interfaces.

Rejected Alternatives

None so far.