Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/l65mst4prx09rmloydn64z1w1zp8coyz

JIRA: FLINK-36032 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the job submission process in Flink mainly consists of two stages:

  1. On the client side, Flink first creates a StreamGraph and then converts it into a JobGraph. During the conversion from StreamGraph to JobGraph, chain optimization is performed on the StreamNodes in the StreamGraph, and the specific operator information contained in each JobVertex is serialized and stored in its Configuration.

  2. After the client generates a JobGraph, it submits this JobGraph to the JobManager (JM), which further converts the JobGraph into an ExecutionGraph for execution.

However, this submission mode has some limitations: the JobGraph and ExecutionGraph cannot directly reflect the actual logical execution plan of the job, which means Flink cannot display or adjust the logical execution plan at runtime:

  1. For example, as mentioned in FLINK-33230 - Getting issue details... STATUS , the job information currently exposed to users through the Web UI and Rest API is only related to the ExecutionGraph. As a result, it difficult for users to understand the specific operators and execution logic within each JobVertex.

  2. In addition, in batch processing scenarios, it can be helpful if Flink can dynamically adjust the job's logical execution plan based on runtime information. For example, when the actual data output from the upstream of a Join operator fits the characteristics of a large table and a small table, it can be helpful if Flink can dynamically switch from a hash join or sort merge join to a more efficient broadcast join.

To solve these issues, this FLIP plans to introduce a job submission mode based on StreamGraph. This mode will enable Flink to directly access and adjust the actual logical execution plan of the job at runtime, enhancing the job's execution performance and observability.

Note that this FLIP only submits the StreamGraph to the JM, and then converts it into a JobGraph within the JM; it will not directly schedule tasks based on the StreamGraph.

Public Interfaces

Add the following configuration option

We intend to add the following configuration parameter. To ensure the configuration parameter accurately represents its functionality. The old configuration item will be deprecated.


Key

Type

Default Value

Description

Old

high-availability.zookeeper.path.jobgraphs

string

/jobgraphs

ZooKeeper root path (ZNode) for job graphs

New

high-availability.zookeeper.path.execution-plans

string

/execution-plans

ZooKeeper root path (ZNode) for execution plans

Change the following Rest API

We plan to update the Flink Submit Job REST API by introducing a new request parameter, executionPlanFileName, which specifies the file name of the stream graph to be submitted. This new parameter cannot be used in conjunction with the existing jobGraphFileName parameter, and only one of them can be specified at a time.

Note that although this API is documented in the official Flink documentation, it is not directly usable by users because it relies on the JobGraph object file, which is not publicly accessible.

Proposed Changes

Since flink-runtime is a foundational framework module that does not depend on flink-streaming-java, it cannot directly use the classes contained in flink-streaming-java, such as StreamOperator, StreamPartitioner. To allow the runtime layer to access StreamGraph, we need to migrate StreamGraph and its related classes from the higher-level flink-streaming-java module to the foundational flink-runtime module. During the migration process, we will retain the original package paths to avoid introducing incompatible changes.

Introduce job submission mode based on StreamGraph.

To introduce a new job submission mode based on StreamGraph in the Session, Application, and MiniCluster runtime environments, we need to:

  1. Ensure that the StreamGraph and its properties are serializable so that the StreamGraph can be transferred from the Client to the JM. Therefore, we need to make these classes implement the Serializable interface.

  2. For scenarios with high-availability mode enabled, we need to save the StreamGraph to external storage so that the job can be recovered from the StreamGraph if necessary. To facilitate this, we plan to introduce ExecutionPlanStore to support storing the StreamGraph.

  3. Compile the JobGraph from the StreamGraph within the JM and then proceed with the subsequent execution process based on the JobGraph.

Future work

Future enhancements will focus on:

  1. Increasing the observability of Flink jobs by supporting operator-level topology displays. This enables finer-grained monitoring and insight into job execution.

  2. Supporting dynamic adjustments to the StreamGraph for smarter scheduling and optimizations. These will include capabilities for adaptive broadcast joins, which can dynamically switching from hash join or sort merge join to a more efficient broadcast join. Moreover, it can help solve skewed joins, resulting from uneven data distributions.

Compatibility, Deprecation, and Migration Plan

This feature will be fully backward and forward compatible, and there is no need for any migration plan.

Test Plan

The proposed changes will be tested for correctness and stability in a real cluster.  Additionally, all unit and integration tests (UT/IT) will be run with this feature enabled.

  • No labels