Discussion threadhttps://lists.apache.org/thread/wkogrk9tt2bznhnj6p0slltr09dhyho5
Vote thread
JIRA

Release1.9


Original Design Document: https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing

Motivation

Shuffle is the process of data transfer between stages, which involves in writing outputs on producer side and reading inputs on consumer side. The shuffle architecture and behavior in Flink are unified for both streaming and batch jobs. It can be improved in two dimensions:

Proposed Changes

We propose a pluggable ShuffleService architecture for managing partitions on JobMaster (JM) side and extending adaptive writer/reader on TE side.

(1) Shuffle Service Factory

public interface ShuffleServiceFactory<SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G extends InputGate> {

      ShuffleMaster<SD> createShuffleMaster(Configuration configuration);

           ShuffleEnvironment<P, G> createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext);

}

(2) Shuffle Master (JM side)

JM process creates ShuffleMaster from configured factory per cluster, and is thus responsible for its lifecycle. ShuffleMaster is a global manager for partitions which means decoupling partition’s lifecycle from task. So it could bring natural benefits for the following improvements.

In the first version, we only focus on migrating current existing process based on new ShuffleMaster architecture. So we define the most basic necessary methods below, and the above mentioned improvements might be forwarded step by step in priority by extending more features in ShuffleMaster.

public interface ShuffleMaster<T extends ShuffleDescriptor> {

      CompletableFuture<T> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor);

      void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);

}

(3) Shuffle Environment (TE side)

TE creates ShuffleEnvironment from configured shuffle factory per cluster, and is thus responsible for its lifecycle. Considering future ShuffleType config on job/edge level, TE could keep a registry of shuffle factories per ShuffleType.

If TE is responsible for transporting partition data to consumer side, the produced partitions occupy local TE resources. In this case TE can be released only when all internal tasks are in FINISHED state and all produced partitions occupying local resources are consumed and released. ShuffleEnvironment could provide the information about locally unreleased partitions to decide whether the producer TE can be released.

      public interface ShuffleEnvironment<P extends ResultPartitionWriter, G extends InputGate> extends AutoCloseable {

            int start(); // returns data port for shuffle data exchange connection

            Collection<P> createResultPartitionWriters(...);

            void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds);

            Collection<ResultPartitionID> getPartitionsOccupyingLocalResources();

            Collection<G> createInputGates(...);

            boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo);

      }

Future Improvement

Current ResultPartitionWriter and InputGate both operate on buffer unit with serialized record data. Certain ShuffleEnvironment implementation might benefit from operating on serialized record or even raw record directly (e.g. partial sort merge partition data).

Public Interfaces

Compatibility, Deprecation, and Migration Plan

Implementation Plan

All the mentioned related work could be done in at least two versions. The first version implements the most basic architecture so that the following versions strictly build upon it.

First MVP: Refactoring to Shuffle API (Flink 1.9)

Introduce ShuffleMaster in JM (FLINK-11391)

Introduce ShuffleService in TE (FLINK-11392)

Next steps

Rejected Alternatives

None so far.