Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/zs7sqpzvcvdb9y42ym6ndtn1fn7m2592

JIRA: FLINK-36033 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Flink, the construction of the execution plan is a multi-layered process that involves three main concepts: StreamGraph, JobGraph, and ExecutionGraph. The StreamGraph contains all the operators of a job and their data distribution patterns; the JobGraph further optimizes the StreamGraph through chaining and serializes the specific operator into JobVertices; the ExecutionGraph is derived from the JobGraph, expanding JobVertices into actual parallel subtasks for execution.

Currently, Flink has the capability to adjust the ExecutionGraph in batch processing mode, such as dynamically deciding the parallelism of JobVertex based on the input data. However, in some specific scenarios, adjustments to the ExecutionGraph are not sufficient to solve problems. At this point, we need to adjust the StreamGraph, including the logic of operators and the data distribution patterns. Specifically:

  1. When the data volume of one input to a Join operator is small and the other input is large, a Broadcast Join offers better performance than a Hash Join or a Sort Merge Join. Therefore, we want Flink to dynamically switch from Hash Join or Sort Merge Join to Broadcast Join based on the actual scale of the input data at runtime.

  2. When a job encounters a data hotspot due to a skewed distribution of a single key during execution, it may be beneficial to re-partition or adjust the computational logic to resolve the issue.

Hence, this FLIP intends to introduce a mechanism for adaptive optimization of StreamGraph, to endow Flink batch processing with stronger adaptability. 

Unlike the traditional execution of jobs based on a static StreamGraph, this mechanism will progressively determine StreamGraph during runtime. The determined StreamGraph will be transformed into a specific JobGraph, while the indeterminate part will allow Flink to flexibly adjust according to real-time job status and actual input conditions.

Note that this FLIP is dependent on FLIP-468: Introducing StreamGraph-Based Job Submission, which enables the scheduler to recognize and access the StreamGraph.

Proposed Changes

Overview

The adaptive optimization of StreamGraph mechanism is mainly divided into two parts:

Incremental JobGraph Generation and Scheduling Based on It

Currently, Flink converts the StreamGraph into a JobGraph before job execution, creating a fixed execution plan, which limits the job's ability to adjust the StreamGraph dynamically during runtime. Therefore, we plan to introduce a new mechanism that incrementally generates JobGraph and schedules tasks based on it. In this new mechanism, the timing of JobVertex generation in JobGraph will be flexibly determined by Flink at runtime.

We consider supporting the creation of JobVertices at the following times:

  1. Source vertices will be created before job scheduling, providing a starting point for the job.

  2. Remaining vertices will be generated after the finish of all their upstream job vertices; at this point, accurate information about the input data will be available, helping to make better optimization decisions.

Once a JobVertex is created, Flink will update the JobGraph and ExecutionGraph accordingly. Flink will then schedule based on the latest ExecutionGraph.

Adaptively Optimize StreamGraph, and Update the Corresponding JobGraph and ExecutionGraph

With support for the incremental generation of JobGraph, it means that the StreamGraph can be modified at runtime, such as adjusting operator parameters or changing partitioners. And Flink can execute subsequent tasks based on the modified StreamGraph, thereby providing jobs with enhanced flexibility and adaptability. 

Hence, we plan to support Flink batch processing mode to adaptively optimize the StreamGraph during runtime based on runtime information, such as actual data input and cluster load. This optimization is only allowed for StreamNode and StreamEdge that have not been converted to JobVertex and JobEdge.

Key Components

To support this mechanism, we plan to introduce three key components:

  1. AdaptiveGraphManager: Responsible for incrementally generating JobGraph from StreamGraph, handling upstream node finished events to generate new JobVertices and updating JobGraph, as well as adding, deleting, and modifying StreamNodes and StreamEdges.

  2. StreamGraphOptimizer: Responsible for loading StreamGraphOptimizationStrategy and deciding based on runtime information whether to optimize the StreamGraph.
  3. AdaptiveExecutionHandler: Responsible for receiving JobEvents and processing JobVertexFinishedEvent by converting it into an OperatorsFinished object, which is then sent to the StreamGraphOptimizer. All JobEvents, including JobVertexFinishedEvent, are forwarded to the AdaptiveGraphManager. Additionally, it notifies the JobGraphUpdateListener of JobGraph updates through callbacks.
  4. JobGraphUpdateListener:Responsible for responding to JobGraph update events. The AdaptiveBatchScheduler will implement it to support updating the ExecutionGraph after JobGraph updates.

Interaction Process

  • Before job starts scheduling, a JobGraph containing the Source node and its chain-able nodes will be created based on the current StreamGraph. StreamNodes that have already been converted to JobVertex will be marked as Frozen, and these nodes and their input edges will no longer be allowed to be modified.

  • During the execution of the job, once relevant events occur (such as job vertex finished, data skewed, etc.), these events will be notified to the AdaptiveExecutionHandler. The StreamGraphOptimizer within the Handler will decide whether optimizes are needed to the StreamGraph based on these events and runtime information and will delegate the specific updates to the AdaptiveGraphManager to perform.

  • During the execution of the job, when the AdaptiveExecutionHandler receives an event that a JobVertex is finished, it will notify the AdaptiveGraphManager to attempt updating the JobGraph. After the update is completed, the AdaptiveExecutionHandler will callback to notify the JobGraphUpdateListener that the JobGraph has been updated, triggering other components (such as the ExecutionGraph) to update.

AdaptiveGraphManager

To support the incremental generation of JobGraph and update the StreamGraph based on runtime information, we plan to introduce the AdaptiveGraphManager component, which will implement the AdaptiveGraphGenerator interface. The AdaptiveGraphGenerator interface is responsible for:

  1. Generating the JobGraph

  2. Responding to upstream job vertex finished events, generating JobVertex, and updating it to the JobGraph

  3. Providing StreamGraphContext to allow StreamGraphOptimizer to add, delete, and modify StreamNode and StreamEdge

AdaptiveGraphGenerator
/**
 * Defines the mechanism for dynamically adapting the graph topology of a Flink job at runtime.
 * The AdaptiveGraphGenerator is responsible for managing and updating the job's execution plan based on runtime events
 * such as the completion of a job vertex. It provides functionalities to generate new job vertices, retrieve the current
 * JobGraph, update the StreamGraph, and track the status of pending stream nodes.
 */
public interface AdaptiveGraphGenerator {

    /**
     * Retrieves the JobGraph representation of the current state of the Flink job.
     *
     * @return The {@link JobGraph} instance.
     */
    JobGraph getJobGraph();

    /**
     * Retrieves the StreamGraphContext which provides a read-only view of the StreamGraph and
     * methods to modify its StreamEdges and StreamNodes.
     *
     * @return an instance of StreamGraphContext.
     */
    StreamGraphContext getStreamGraphContext();

  /**
     * Responds to notifications that a JobVertex has completed execution. This method generates new job vertices,
     * incorporates them into the JobGraph, and returns a list of the newly created JobVertex instances.
     *
     * @param finishedJobVertexId The ID of the completed JobVertex.
     * @return A list of the newly added {@link JobVertex} instances to the JobGraph.
     */
    List<JobVertex> onJobVertexFinished(JobVertexID finishedJobVertexId);
}

The StreamGraphContext interface provides a read-only view of the StreamGraph, as well as methods to modify specific components, such as methods for StreamNode and StreamEdge. By providing this interface to pass through the information of the StreamGraph to the StreamGraphOptimizer, the AdaptiveGraphManager is able to perceive changes in the structure of the StreamGraph and update its own state accordingly.

StreamGraphContext
/**
 * Defines a context for optimizing and working with a read-only view of a StreamGraph.
 * It provides methods to modify StreamEdges and StreamNodes within the StreamGraph.
 */
public interface StreamGraphContext {

    /**
     * Returns a read-only view of the StreamGraph.
     *
     * @return a read-only view of the StreamGraph.
     */
    ReadableStreamGraph getStreamGraph();

    /**
     * Modifies a StreamEdge within the StreamGraph.
     *
     * @param streamEdge the StreamEdge to be modified.
     */
    void modifyStreamEdge(StreamEdge streamEdge);

    /**
     * Modifies a StreamNode within the StreamGraph.
     *
     * @param streamNode the StreamNode to be modified.
     */
    void modifyStreamNode(StreamNode streamNode);
}

StreamGraphOptimizer

To support optimize the StreamGraph based on runtime information, we plan to introduce the StreamGraphOptimizer component. This component, upon initialization, will obtain the StreamGraphContext from the AdaptiveGraphManager. And this component will load a set of StreamGraphOptimizationStrategies. At runtime, it will pass the received OperatorsFinished and the StreamGraphContext sequentially to all StreamGraphOptimizationStrategies, attempting to optimize the StreamGraph.

StreamGraphOptimizationStrategy
/**
 * Defines an optimization strategy for StreamGraph. Implementors of this interface provide methods
 * to modify and optimize a StreamGraph based on contexts provided at runtime.
 */
@FunctionalInterface
public interface StreamGraphOptimizationStrategy {      
	
	/**
 	* Tries to optimize the StreamGraph using the provided OperatorsFinished and the StreamGraphContext. 
 	* The method returns a boolean indicating whether the StreamGraph was successfully optimized.
 	*
 	* @param operatorsFinished the OperatorsFinished object containing information about completed operators and their produced data size and distribution information.
 	* @param context the StreamGraphContext with a read-only view of a StreamGraph, providing methods to 
 	*                modify StreamEdges and StreamNodes within the StreamGraph.
 	* @return {@code true} if the StreamGraph was successfully optimized; {@code false} otherwise.
 	*/
	boolean maybeOptimizeStreamGraph(OperatorsFinished operatorsFinished, StreamGraphContext context);
}
OperatorsFinished
/**
 * This class represents the information about the finished operators.
 * It includes a list of StreamNode IDs representing the finished operators,
 * and a map associating each finished StreamNode ID with their corresponding produced data size and distribution information.
 */
public class OperatorsFinished {
    /**
     * A list that holds the IDs of the completed StreamNodes.
     */
    private List<String> finishedStreamNodeIds;
    
    /**
     * A map that associates each finished StreamNode ID with a list of IntermediateResultInfo objects.
     * The key is the StreamNode ID, and the value is a list of IntermediateResultInfo.
     */
    private Map<String, List<IntermediateResultInfo>> resultInfoMap;
}

In the current design, the specific optimization strategies to be used are specified by the internal configuration item "execution.batch.adaptive.stream-graph-optimization.strategies", and the Runtime layer does not need to concern itself with the specifics of the tuning implementation. Currently, this configuration item is only set at the SQL layer and is not exposed to users as a public interface. It can be exposed to users later if needed.

Annotation

Key

Type

Default Value

Description

Internal

execution.batch.adaptive.stream-graph-optimization.strategies

List<String>

none

Defines a comma-separated list of fully qualified class names implementing the StreamGraphOptimizationStrategy interface.

AdaptiveExecutionHandler

To adapt to the incremental generation of JobGraph, we plan to introduce a new AdaptiveExecutionHandler component, which is primarily responsible for responding to Scheduler requests and interacting with the AdaptiveGraphManager and StreamGraphOptimizer, and specifically responsible for:

  1. Forwarding runtime events to the StreamGraphOptimizer to trigger StreamGraph updates.
  2. Forwarding JobVertex Finished events to the AdaptiveGraphManager to trigger the creation of new JobVertex and JobGraph updates.
  3. Notifying the JobGraphUpdateListener when the JobGraph changes.
AdaptiveExecutionHandler
/**
 * The {@code AdaptiveExecutionHandler} interface defines the operations for handling the adaptive
 * execution of jobs. This includes acquiring the current job graph and dynamically
 * adjusting the job topology in response to job events.
 */
 public interface AdaptiveExecutionHandler {

   /**
     * Retrieves the current {@link JobGraph}.
     *
     * @return the current job graph.
     */
    JobGraph getJobGraph();

    /**
     * Handles the provided {@code JobEvent}, which may trigger job vertex creation and dynamic modifications to the 
     * {@code StreamGraph} based on the specifics of the job event.
     *
     * @param jobEvent The job event to handle, containing the necessary information that might trigger job vertex creation 
     *                  and adjustments to the StreamGraph.
     */
    void handleJobEvent(JobEvent jobEvent);


    /**
     * Registers a listener that will be notified of updates to the job graph. This allows for
     * external components to observe and respond to changes in the job's execution plan.
     *
     * @param listener the listener to register for job graph update notifications.
     */
    void registerJobGraphUpdateListener(JobGraphUpdateListener listener);
}

JobGraphUpdateListener

AdaptiveBatchScheduler will implement this interface, allowing it to receive notifications of JobGraph updates and subsequently perform the updates on the ExecutionGraph.

JobGraphUpdateListener
/**
 * The {@code JobGraphUpdateListener} interface defines operations for components that are
 * interested in being notified when new job vertices are added to the job graph.
 */
public interface JobGraphUpdateListener {
    /**
     * Invoked when new {@link JobVertex} instances are added to the JobGraph of a specific job.
     * This allows interested components to react to the addition of new vertices to the job topology.
     *
     * @param jobId The ID of the job to which the new vertices have been added.
     * @param newVertices A list of newly added JobVertex instances.
     */
    void onNewJobVerticesAdded(JobID jobId, List<JobVertex> newVertices);
}

Public Interfaces

Modifying web UI and rest API to adapt to Incremental JobGraph Generation

When using incremental generation of JobGraph, the current Web UI and Rest API based on ExecutionGraph only display the current determined job topology, and users are unable to perceive the current job transformation progress. To reduce user confusion, we need to add new information to the Web UI and Rest API to inform users:

Displaying the number of StreamNodes waiting for transformation on the Web UI

  • We need to display the number of StreamNodes currently waiting to be transformed into JobVertex on the Web UI, with a corresponding hover tip showing "pending operators", as shown in the figure below:

  • The job topology will display a hybrid of the current JobGraph along with downstream components yet to be converted to a StreamGraph. On the topology graph display page, there will be a "Show Pending Operators" button in the upper right corner for users to switch back to a job topology that only includes JobVertices. If the job does not have any pending operators, this button will not be displayed. Pending streamNodes will be displayed with dashed borders and a green background color, and the basic information of the StreamNode, such as id, parallelism, max-parallelism, and operator name, will be included in the box. Other detailed information, such as operator description, will be displayed through hover-over. The connections between StreamNodes and between StreamNodes and JobVertex will be represented by dotted lines.

Extending job detail rest API

  1. Extending the response body of job detail rest API, adding a new key-value pair "pending-operators: Number".
  2. Add a new field stream-graph-plan, which represents the runtime Stream graph. The field "job-vertex-id" is valid only when the StreamNode has been converted to a JobVertex, and it will hold the ID of the corresponding JobVertex for that StreamNode.
{
  ...
  
  "status-counts": {
    ...
    pending-operators: 29,
    ...
  },

  "stream-graph-plan": {
    "jid": "3cdff684f890355be178fe1f0ae67baf",
    "name": "q4",
    "type": "BATCH",
    "nodes": [
      {
        "id": 0,
        "slotSharingGroup-name": "default",
        "maxParallelism": 1000,
        "parallelism": 1,
        "operator-name": "CsvTableSource(read fields: field)",
        "operator-description": "CsvTableSource(read fields: field)",
        "jobvertex-id": Optional String
        "input-edges": [
          {
            "type-num": "0",
            "partitioner": "REBALANCE",
            "exchange": "blocking",
            "source-id": 43,
            "target-id": 44
          }
          ...
        ]
      }
      ...
    ]
  }
  ...
}

Future work

With the implementation of supporting adaptive optimization of streamGraph, our next steps will focus on expanding the Flink batch job adaptive capabilities, specifically support for adaptive join types and handling skewed joins.

Limitation

Only work with adaptive batch scheduler:

In FLIP-283, adaptive batch scheduler has been the default scheduler of Flink batch jobs, so we intend to only support working with the adaptive batch scheduler.

Compatibility, Deprecation, and Migration Plan

Adaptive optimization of streamGraph will be enabled when submit stream graph. As a result, the Web UI will only display the current topology, and users will need to combine it with pending StreamNode indicators to understand the adaptation process.

Test Plan

The proposed changes will be tested for correctness and stability in a real cluster.

  • No labels