Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The CompiledPlan feature, introduced in FLIP-190: Support Version Upgrades for Table API & SQL Programs, supports only streaming execution mode. Batch ExecNodes were explicitly excluded from the JSON plan (aka CompiledPlan).

The current proposal aims to achieve uniformity of representation for Batch and Streaming execution mode, and to allow serialization (and deserialization) for Batch ExecNodes in CompiledPlan.

Why should Flink support this?

  • Uniformity of approach to all exec nodes allows to keep “middle-tier” parts, that are on the path of query serving, simpler.
    • Engineers, tools and components would be able to follow the same processes for both streaming and batch queries.
    • If some component needs to distinguish batch v.s. streaming in the CompiledPlan, a separate flag could be passed around, or this could be derived from the CompiledPlan itself.
  • In production deployments, query processing front-end (parser, planner) could live in dedicated services. CompiledPlan allows to decouple tight version coupling between these services and the actual Flink cluster that would be executing a job prepared earlier. This simplifies deployment management.
  • Enhanced maintainability and support. Future improvements to the infra and components that are dealing with CompiledPlan would be beneficial for both execution modes.

Public Interfaces

No changes to the public interfaces

Proposed Changes

The proposal expects to piggy-back on the general compiled plan infra introduced in the FLIP-190 and thereafter enhanced.

The changes are:

  1. Allow compiled plan operations in the BatchPlanner:
    1. compilePlan
    2. translatePlan
    3. loadPlan
  2. Annotations @ExecNodeMetadata, @JsonCreator, and @JsonProperty are added to BatchExec* classes.
  3. Once all BatchExec* classes are supported, a pre-submit validation is added in place to enforce CompiledPlan support for all newly added BatchExec* classes.

Before this FLIP, attempt to create a CompiledPlan in batch execution mode results in an error "The compiled plan feature is not supported in batch mode."

Compatibility, Deprecation, and Migration Plan

No plans to deprecate the feature overall.

We don't want to put too much burden on the Flink community and keep supporting all earlier serialized versions of BatchExecNodes. BatchExecNodes can evolve quicker than StreamExecNodes as the state component isn't an issue. Backwards compatibility of 2-3 Flink versions and at least 1 year of time should be enough for batch infrastructure to update. Of course we should avoid breaking changes whenever possible.

Test Plan

CompiledPlan testing infra to be extended to include corresponding tests for all BatchExec* operators.

Similar to *RestoreTest.java test in here:

https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream

Due to batch queries nature, we do not expect the jobs to run forever (days, months, years) – unlike streaming jobs that could run for a long time. Hence there is no need to test state restore for operators. We expect all the queries to start and finish between Flink version upgrades.

The important functionality to be covered by tests: we need to test that any generated CompiledPlan for batch mode can be executed.

Rejected Alternatives

No comparable alternatives.