Discussion threadhttps://lists.apache.org/thread/n8v32j6o3d50mpblxydbz82q1q436ob4
Vote threadhttps://lists.apache.org/thread/rxxxq0q9v9pmzd2ht9nybpg5vrzyhwx7
JIRA

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Nowadays, the Table & SQL API is as important to Flink as the DataStream API. It is one of the main abstractions for expressing pipelines that perform stateful stream processing. Users expect the same backwards compatibility guarantees when upgrading to a newer Flink version as with the DataStream API:

https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/upgrading/

Example:

A user would like to use SQL which has some lifetime aggregates and needs to upgrade their Flink cluster from one minor version (1.15) to the next minor version (1.16).

In particular, this means:

However, since a planner with rule-based and cost-based optimization is involved in finding the final execution plan, every rule, function, connector and format change could potentially introduce a completely different topology.

For example: for more efficient execution, we introduce a new rule that pushes a filter through an operator O. Since the filter columns are not required by O anymore, the schema would change. However, a completely different plan might be chosen in the end due to improvements in the cost estimation in a new Flink version. In short: It is difficult to ensure savepoint compatibility in such a dynamic topology creation process.

Terminology

For clarification, we distinguish between the following terms:

TermDefinition
table program

List of statements that configure the session, connect to catalogs, register (temporary) catalog objects, define and submit one or more pipelines.

A table program can be expressed using Table API in Java or could be a multi-statement SQL script for SQL Client.

pipeline

A pipeline is a DAG that consists of one or more (potentially disconnected) source-to-sink dataflows.

Statement sets allow for n:n source-to-sink dataflows. A pipeline is compiled into a single JobGraph.

See also here.

state

Any value, list, or map member of an operator that is managed by Flink. Each state is snapshotted using the `operator uid + state name` into a savepoint.

During restoration, state is mapped back to any kind of operator (even if the topology changed) using those two components.

upgrade

The change from one Flink minor version to another. For example, from 1.15 to 1.17, or 1.17 to 1.18.

A patch version change (e.g. 1.13.2 to 1.13.3) is not considered an upgrade and has already been supported before this FLIP.

Upgrading usually involves work which is why many users perform this task rarely (e.g. only once per year). Also skipping a versions is common until a new feature has been introduced for which is it worth to upgrade. We will support the upgrade to the most recent Flink version from a set of previous versions. We aim to support upgrades from the last 2-3 releases on a best-effort basis; maybe even more depending on the maintenance overhead. However, in order to not grow the testing matrix infinitely and to perform important refactoring if necessary, we only guarantee upgrades with a step size of a single minor version (i.e. a cascade of upgrades).

As it is common when skipping versions, we still recommend users to check the release notes and perform migration if instructed to do so.

In other words: A user can upgrade between minors and all following minors. The goal is: The same query can remain up and running. E.g. a user upgrades from 1.15 to 1.16, and then from 1.16 to 1.17 and can expect the original query to work without recomputing the data or the plan from the original SQL. This necessarily means that at some point in future releases we'll need some basic "migration tool" to keep the queries up and running, ending up modifying the compiled plan (see also COMPILE PLAN ... FROM ...) or savepoint.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: table program, catalog objects, configuration options, and external JAR files have not changed.

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could be removed from the new savepoint metadata (given such a functionality exists, which is future work). The operator implementation could look completely different, but as long as `operator uid + state name` match, the migration is possible. 

Plan migration: We transform the JSON plan (introduced later in this document) into a new plan. For example, to remove legacy attributes that are not necessary in a new JSON plan layout. This has no impact on the savepoint itself but on older generations of the JSON parsing and validation logic that can drop legacy attributes. Also, if two operators have been fused into a single one in later versions, the plan can be migrated such that it doesn't reference the two legacy operators anymore.

Savepoint migration: We provide a tool that transforms the savepoint data into a new format. This tool could be a Flink job itself (e.g. using the State Processor API). This provides the highest flexibility as the topology can change completely (at least in theory).

There should be some good logging in place when the upgrade/migration is taking place to be able to track every restoration action, and help debug any potential issues arising from that.

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version. No action of the users or any other modification of the table program is required.

savepoint compatibility

The state stored in a savepoint can still be used to initialize (still existing) operators. 

schema evolution

A column has been renamed, added to, or removed from a table or view. The same applies to a (nested) data type of a column or function that transitively affects the overall dynamic table layout and pipeline.

query evolution

A fundamental change to the query. E.g. adding a filter condition, a different aggregation, an additional join or subquery.

Scoping

The scope of this FLIP can be summarized as follows using the terminology above:

Important: We will mark the first version of this FLIP probably in 1.15 as a BETA feature. The community needs to learn how to evolve the code base with these hard constraints. 1.16 will be the first version with a restore scenario. If this turns out to be successful with no or minor issues, we can mark this feature stable in 1.17.

Full Example SQL

A pure SQL example in SQL Client could look like this:

-- use temporary objects
CREATE TEMPORARY TABLE clicks (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

-- set configuration
SET 'parallism.default' = '10';
SET 'pipeline.name' = 'my_flink_job';

-- compile the pipeline into a plan that can be executed
COMPILE AND EXECUTE '/my/path/my_flink_job.json' FOR STATEMENT SET
BEGIN

  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;

END;

Full Example Table API

A complex Table API with DataStream API as virtual source and sink could look like this:

// Setup environments
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create a custom DataStream
final DataStream<Row> inputStream = env.fromElements(
  Row.of(RowKind.UPDATE_AFTER, "Alice"),
  Row.of(RowKind.UPDATE_AFTER, "Bob"));

// Assign a unique uid() for the stream
inputStream.uid("my-stream");

// Read JSON plan file or compile + write it first
// We use a Supplier here but of course this can also be completely custom logic
CompiledPlan plan = CompiledPlan.fromFile(
  "/my/path/my_flink_job.json",
  () -> {
    tableEnv
      .fromDataStream(inputStream)
      .select($("f0").count())
      // the caller will write out this plan into a file
      .compilePlan(); 
  });

DataStream<Row> outputStream = tableEnv.asDataStream(plan, Row.class);

outputStream.executeAndCollect();

Basic Design / Current Internal Design

The basic design has already been introduced in Flink 1.14. It is based on a JSON plan that serializes the optimized execution plan and is able to restore a pipeline from that. The JSON plan must be stored explicitly next to the table program.

Internal prototypes and products have shown that the JSON plan is already useful and only needs minor improvements before making it publicly available and a long-term feature for Flink users.

A JSON plan example for the following program can be found here:

json_plan_example.json

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
System.out.println(
        tableEnv.explainSql(
                "SELECT word, SUM(frequency) AS `count`\n"
                        + "FROM (\n"
                        + "  VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2)\n"
                        + ")\n"
                        + "AS WordTable(word, frequency)\n"
                        + "GROUP BY word"));

== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], count=[SUM($1)])
+- LogicalValues(tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[word], select=[word, SUM(frequency) AS count])
+- Exchange(distribution=[hash[word]])
   +- Values(type=[RecordType(VARCHAR(5) word, INTEGER frequency)], tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])

== Optimized Execution Plan ==
GroupAggregate(groupBy=[word], select=[word, SUM(frequency) AS count])
+- Exchange(distribution=[hash[word]])
   +- Values(tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])


The optimized execution plan is a graph of ExecNode. An ExecNode is a static template for creating Transformations.

An ExecNode might translate into multiple Transformations with various operators.

For example, the ExecNode for changelog sinks (StreamExecSink) might consist of a NotNullEnforcer, KeyBy, and SinkUpsertMaterializer transformation.

Sources and sinks will be restored using factories.


See also:

https://issues.apache.org/jira/browse/FLINK-20435 (Introduction of the ExecNode layer)

https://issues.apache.org/jira/browse/FLINK-21091 (Initial version of a JSON plan for all nodes)

ExecNodes

Currently, we list the following ExecNodes:

Supported:

StreamExecCalc
StreamExecChangelogNormalize
StreamExecCorrelate
StreamExecDeduplicate
StreamExecDropUpdateBefore
StreamExecExchange
StreamExecExpand
StreamExecGlobalGroupAggregate
StreamExecGlobalWindowAggregate
StreamExecGroupAggregate
StreamExecGroupWindowAggregate
StreamExecIncrementalGroupAggregate
StreamExecIntervalJoin
StreamExecJoin
StreamExecLimit
StreamExecLocalGroupAggregate
StreamExecLocalWindowAggregate
StreamExecLookupJoin
StreamExecMatch
StreamExecMiniBatchAssigner
StreamExecOverAggregate
StreamExecPythonCalc
StreamExecPythonCorrelate
StreamExecPythonGroupAggregate
StreamExecPythonGroupWindowAggregate
StreamExecPythonOverAggregate
StreamExecRank
StreamExecSink
StreamExecSortLimit
StreamExecTableSourceScan
StreamExecTemporalJoin
StreamExecTemporalSort
StreamExecUnion
StreamExecValues
StreamExecWatermarkAssigner
StreamExecWindowAggregate
StreamExecWindowJoin
StreamExecWindowRank
StreamExecWindowTableFunction

Unsupported yet:

StreamExecGroupTableAggregate
StreamExecDataStreamScan
StreamExecPythonGroupTableAggregate
StreamExecSort
StreamExecMultipleInput

Limitations:

Some additional limitations of the current design:

  1. Batch ExecNodes cannot be serialized into a JSON plan.
  2. Only SQL INSERT INTO queries and STATEMENT SET are supported.
  3. Table API is not supported. Including `TableResult#collect()`.
  4. Bridging from and to DataStream API is not supported.
  5. Catalog tables are serialized into the plan. The catalog is not read again after an upgrade for tables.
  6. User functions are serialized into the plan using Java serialization.
  7. Built-in functions are not versioned and will be looked up in the current built-in catalog again after an upgrade.
  8. JSON plan is quite verbose. It can grow quickly even for smaller queries.

Proposed Changes

We propose the following changes to the current design in order to reduce friction and provide a consistent API experience:

  1. Expose the JSON plan concept to users.
  2. Support generating and providing a JSON plan in every part of the API where the optimizer is triggered.
    sqlQuery().execute().collect(), Table.executeInsert(), toChangelogStream(), etc.
  3. Consider multi-statement and mixed DataStream API/Table API pipelines.
  4. Expose the JSON plan with SQL syntax.
  5. Make the JSON plan independent of any class/package name.
    no Java serialization anywhere, no "o.a.f.t.planner.plan.nodes.exec.stream.*" or other implementation details, no MapView/ListView stuff
  6. Come up with a simple rule of thumb for the user when a JSON plan cannot be generated.
  7. Let users configure strategies how to deal with catalog objects
    serialize them into the plan, serialize them partially into the plan, ignore plan info during restore
  8. Version functions as well
  9. Have well-defined uid() for all operators to use a Table API program with DataStream API

Batch nodes are not considered in this design.

Remark to 6: A plan compilation will fail hard with a helpful exception. For example, this is the case for invalid ExecNodes, inline objects, or DataStream API source's without proper uid(). Once the plan compilation has been successful, we support the plan in the next Flink version.

Remark to 9: We also need to make sure that all sources/sinks define a uid(). Esp. when using DataStreamScanProvider and DataStreamSinkProvider.

General JSON Plan Assumptions

The following assumptions should help in documenting the properties of a JSON plan:

  1. Modules are not part of the JSON plan. They must have been loaded as before.
  2. Most of the table/session configuration is not part of the JSON plan. It must be the same as before. This is important for table factories and UDFs where we don't know what they have consumed.
  3. Configuration consumed by ExecNode should be persisted if it influences the topology or has runtime implications (i.e. not just forwarding the value but using it for calculation). (e.g. `table.exec.sink.not-null-enforcer`). In other words: it is not necessary to persist code gen options but we should persist `table.exec.` options.
  4. Temporary DDL statements (to init the table environment and then make sure the temporary UDFs and connectors can be found) are not part of the JSON plan.
  5. We do not version types. We assume that the type system is stable or will only be extended by more types but not modified.
  6. The origin of every catalog object is stored with the object identifier in the JSON plan. However, we cannot assume that all catalog metadata stays constant as a catalog might be shared across teams. We can only assume that the catalog instance and type (e.g. Hive catalog) stays constant for a session to provide a table factory.
  7. The JSON plan contains no implementation details (i.e. no classes) and is fully declarative.
  8. The JSON plan is versioned by Flink version. 
  9. The JSON plan is internal to Flink. We don't support external changes to the plan officially. However, the JSON plan format must not change between patch releases.
  10. Flink will not perform any kind of validation or consistency checks during restore except for the supported Flink plan version.
  11. We do not version connectors and formats. If a connector adds a new ability, we can simply not apply this ability for older plans. Removing an ability is a problem and would require a correcting operator afterwards.

Remark to 3: We should use hints to change the semantic behavior of sources and sinks instead of global config options. We should avoid having semantic meaning in global config options instead of in the query (and thus in the JSON plan). See also FLINK-24254.

Remark to 3: We add a new JSON `Map<String, String>` property to ExecNode to keep track of the node configuration.

Remark to 7: We use ExecNode name and version instead of class names.

Remark to 8 & 9: We will not change the JSON plan format across patch releases. Since every JSON plan is versioned by the compiling Flink version, external tooling can be build around it but without any guarantees from the Flink community across Flink minor/major versions yet. This is future work and can be offered once the plan design has settled.

Remark to 10: In other words: We verify whether the Flink version contained in the plan is supported. This must not necessarily be the last Flink version. But after that the only consistency check will be the restore deserialization itself.

Remark to 11: A plan migration could add such an operator. Alternatively, the connector properties could enable/disable this ability for backward compatibility. We can use catalog table enrichment or plan migration to add a new connector property.

Catalog Objects

Temporary objects will not be persisted in the plan. Many of them have no serializable representation. E.g. a temporary table that is backed by a DataStream API pipeline or a table source instance.

For users, it is easier to understand the rule of thumb to just say "temporary objects are not part of the plan" instead of saying "some can, some cannot".

For inline objects (used with registering) within Table API we do best-effort as shown below.

Views

Views are always inlined. View information will be lost and not part of the plan in any way.

Tables

Temporary tables will not be persisted in the plan. They will be represented by an object identifier.

Inline tables (tables with a generated temporary object identifier) are handled differently:


For persisted catalog tables, depending on the use case, some users would like to

Options

Note: We separate compile and restore options for flexibility. The default values should aim to make most beginner users happy.

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a table is persisted: Schema + Identifier + Options
The catalog's "metadata" must not be available anymore. It makes the initial experience better with in-memory catalogs (e.g. building demos using SQL Client). However, the catalog "instance" itself must still be available in case a catalog provides a custom table factory. For commercial versions, it might make sense to set this value to `IDENTIFIER`, because in most cases, enterprises have a persistent catalog and lock changes there.

SCHEMA: Only basic catalog metadata for validation is persisted: Schema + Identifier. This allows validation in case the external catalog table has changed. Options will later require a read from the catalog again.

IDENTIFIER: Only the object identifier is persisted. Reduces the size of the plan. A job may contain dozens of connectors and a table may contain hundreds of fields. The user needs to make sure that catalog tables don't change. Changes of the schema could lead to hard to debug mismatch exceptions during restore. Also options that influence the support of the connector abilities could lead to exceptions.

table.plan.restore.catalog-objects

Enum

ALL (Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place. Also enrichment of forwarded options still takes place (see below). If the metadata is not available anymore, the plan can still be executed.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again. Changes of the schema or options could lead to hard to debug mismatch exceptions. 

table.plan.restore.enrich-table-options

Boolean

By default (true), we always perform a catalog lookup and enrich the plan options with catalog options. In this case, catalog options have precedence. If false, we only use the JSON plan options. 


For the latter option, the factory is the only entity able to judge whether a change causes a modification of semantics or topology. For example, the factory knows how a `topic` option for Kafka is forwarded and can allow a name change that doesn't have any side effects for the source abilities or schema.

DynamicTableFactory {

    /* Declares options that can be modified without runtime implications. */
    Set<ConfigOption<?>> mutableOptions();
    
}

Functions

For UDFs, we will only support the new function stack in the JSON plan.

System functions are temporary and will only be persisted by name (and version) in the plan. Therefore, we can also support function instances there (not only classes). Also, temporary catalog functions will only be persisted by object identifier in the plan and can support instances. No Java serialization required.

Parameterized inline functions will not be supported. Users will get an error message to register the UDF as a temporary function:

val udf = new MyUdf("a");
table.select(udf($("myField))));

However, we will support inline functions that have no member variables for which we can serialize the class name. So the following kind of calls are supported:

val udf = new MyUdf();
table.select(udf($("myField))));

tableEnv.createTemporarySystemFunction("myUdf", new MyUdf("a"));
table.select(call("myUdf", $("myField))));

If this use case is a hard requirement, we can offer `UserDefinedFunction.toConfiguration()/fromConfiguration()` methods in UDFs in the future to properly de-/serialize its configuration. But I don't think that many users use parameterized inline functions, so this error will not happen often. And there is a workaround by registering it as a temporary function.

It should be possible for the user to change the internal UDF implementation (e.g. upgrade a library etc) as long as input/output parameters stay the same. 

Options

Persisted catalog functions are stored by object identifier and class. Similar options as to catalog tables apply:

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a function is persisted: Identifier + Class.
The catalog's "metadata" must not be available anymore.

SCHEMA: Same with IDENTIFIER because we store the schema anyway.

IDENTIFIER: Only the object identifier is persisted. The user needs to make sure that catalog functions signatures don't change. The class can change.

table.plan.restore.catalog-objects

Enum

ALL (Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again.


We only version built-in system functions, not user-defined/catalog ones.

Types

We don't fully support user-defined types yet. But the mechanism would be similar to the one of functions.

Inline structured types will be written into the JSON plan.

We can further simplify the current JSON plan by using all `LogicalType#asSerializableString` if possible. Most basic types support LogicaType#asSerializableString. However, time attribute meta information doesn't belong into LogicaType#asSerializableString. So in the end, it might be safer to separate LogicaType and JSON plans completely. And treat them as orthogonal concepts.

Special cases will be represented with special JSON. Currently, everything is represented as special JSON which makes the JSON plans large.

For example:

"type" : {
 "typeName" : "TIMESTAMP",
 "nullable" : false,
 "precision" : 3
}

Can be:

TIMESTAMP(3) NOT NULL

Side note: LegacyTypeInformationType will not be supported. RawType has a stable serializable string representation using serializer snapshots.

Public Interfaces

SQL

EXECUTE

We suggest introducing a consistent block syntax. This is also useful for future stored procedures and other block style definitions in SQL scripts.

The current STATEMENT SET syntax is confusing as a `BEGIN` should always be at the beginning of the block. Similar to SQL Server stored procedures:

https://www.sqlservertutorial.net/sql-server-stored-procedures/sql-server-begin-end/

Statement sets receive a more consistent syntax using the EXECUTE keyword. This also indicates that an "execution" is happening "for this block".

EXECUTE STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

// The following statement is supported for legacy reasons.
// This syntax has various issues (BEGIN at the wrong location, semicolon at the beginning of a block).

BEGIN STATEMENT SET;
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;


Any DML or set of DML can be executed with EXECUTE:

// the following statement are equivalent

INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

EXECUTE INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;


Having EXECUTE as a DDL clause allows for parameterizing the execution. This might be helpful in the future and will immediately solve the definition of a plan in a table program + potential parameters in the future.

EXECUTE PLAN will always (as the name suggests) take a JSON plan path instead of a statement. The meaning of the path depends on the deployment. It should work similar to ADD JAR and should be absolute.

// execute a plan from a JSON file,
// the plan must be present and self-contained,
// otherwise temporary DDL is necessary before this statement
EXECUTE PLAN '/mydir/plan.json';

COMPILE

Similarly we offer a COMPILE PLAN statement for creating a JSON plan file for both INSERT INTO and STATEMENT SETs.

If a plan file exists already, an exception will be thrown. The IF NOT EXISTS allows multiple runs of the same SQL script without creating an exception.

// create a plan without execution
COMPILE PLAN '/mydir/plan.json' FOR INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

COMPILE PLAN '/mydir/plan.json' IF NOT EXISTS FOR INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

COMPILE PLAN '/mydir/plan.json' [IF NOT EXISTS] FOR STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

Usually, bigger SQL scripts are defined idempotent for multiple runs which means that DDL is ignored if a table/function is created already. Both COMPILE PLAN IF NOT EXISTS and EXECUTE satisfy this property. However, a path needs to be defined twice if both steps should be part of the same SQL script which is likely to be the most common case.

We support the following combination of both statements using COMPILE AND EXECUTE PLAN. If the plan file doesn't exist, it will be compiled from the statement. Once the plan file exists, it will be executed immediately.

// plan will be executed and generated from a given DML if necessary,
// statement itself is idempotent for multiple runs of the same script,
// overwriting a file is not allowed, it will only be created once
COMPILE AND EXECUTE PLAN '/mydir/plan.json' FOR INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

// similar syntax for statement sets,
// plan will be generated if necessary
// statement is idempotent for multiple runs
COMPILE AND EXECUTE PLAN '/mydir/plan.json' FOR STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;


Other kinds of statements that might be useful when dealing with plans:


// during debugging it should be possible to already define COMPILE AND EXECUTE PLAN … FOR
// with a proper path but always recompile
SET 'table.plan.force-recompile'='true';

// -------------------------
// not in the first version
// -------------------------

// drop a plan
DROP PLAN '/mydir/plan.json';

// Perform plan migration in the future.
// This statement has been added for completeness. Users will need to execute it 
// when we instruct them to do so in the release notes. Plan migration will be one
// way of dropping legacy code in Flink before we have savepoint migration.
COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

EXPLAIN

The current EXPLAIN command is inconsistent. Currently, we support both EXPLAIN and EXPLAIN PLAN FOR:

EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement>

However, the PLAN FOR provides no additional benefit and disables specifying the explain details.

Note: Since "PLAN" should be reserved for the JSON plan, we suggest to deprecate the PLAN FOR syntax.

For consistent EXPLAIN statements we suggest the following syntax:

// regular query
EXPLAIN SELECT * FROM MyTable;

// regular statement set
EXPLAIN STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

// parameters for explain
EXPLAIN ESTIMATED_COST, CHANGELOG_MODE STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

// allows to get insights (which insights need to be discussed)
// and basic validation of the plan (e.g. resolution of catalog objects)
EXPLAIN PLAN '/mydir/plan.json';

Table API

In contrast to SQL, the Table API resolves tables, functions, and expressions eagerly for every operation. This means we cannot declare the pipeline fluently without interpreting it already. It makes idempotent statements such as `tableEnv.from("T1").select(...).compileAndExecute()` impossible as both `from()` and `select()` are interpreted eagerly. In programming languages, it is easy to first compile a plan and then pass the instance to an execute method. We offer a supplier method for this.

We have various locations where the optimizer is triggered:

In the first version, we might not implement all these methods but focus on the most important ones.

CompiledPlan

We provide the concept of a `CompiledPlan`. It is an immutable instance that represents a plan string.

// Read string from local file system
CompiledPlan.fromFile(String path)
CompiledPlan.fromFile(Path path)

// We provide a supplier pattern for convenience
CompiledPlan.fromFile(Path path, Supplier<CompiledPlan> ifNotExist)

// Or create with a string
CompiledPlan.fromJsonString(String json)

// Access the raw content
// (the CompiledPlan.toString might be used for a summary representation)
CompiledPlan.asJsonString(): String

// Use convenience method to write it to a file
CompiledPlan.writeToFile(String path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
CompiledPlan.writeToFile(Path path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)

Single SQL Statements

// --------------------
// COMPILE
// --------------------

// Interface
TableEnvironment.compilePlanSql(String): CompiledPlan

// Example
tableEnv.compilePlanSql("SELECT * FROM T1");
tableEnv.compilePlanSql("INSERT INTO T2 SELECT * FROM T1");

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromFile("/my/path")).print();

SQL Query and Table Query

// --------------------
// COMPILE
// --------------------

// Interface
Table.compilePlan(): CompiledPlan

// Example
tableEnv.sqlQuery("SELECT * FROM T1").compilePlan();
tableEnv.from("T1").select($("name")).compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.fromPlan(CompiledPlan): Table

// Example
tableEnv
  .fromPlan(CompiledPlan.fromFile("/my/path"))
  .select($("name"))
  .execute()
  .print();

Statement Sets

// --------------------
// COMPILE
// --------------------

// Interface
StatementSet.compilePlan(): CompiledPlan

// Example
tableEnv
  .createStatementSet()
  .addInsertSql("INSERT INTO T2 SELECT * FROM T1")
  .addInsertSql("INSERT INTO T3 SELECT * FROM T1")
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Same API as single SQL queries.

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromFile("/my/path")).print();

Table with Sink

// Refactor the Table.executeInsert() into declarative insertInto() methods
// because otherwise we would need to add a lot of overloading

// This makes the API way easier and solves other shortcomings of the current design.
Table.insertInto(String targetPath): TablePipeline
Table.insertInto(String targetPath, boolean overwrite): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor, boolean overwrite): TablePipeline

TablePipeline.compilePlan(): CompiledPlan
TablePipeline.execute(): TableResult

// Nice improvement on the way,
// this makes the statement set API more concise.
// No need for several overloaded methods any more.
// This method fits perfectly next to `addInsertSql`.
StatementSet.addInsert(TablePipeline)

// --------------------
// COMPILE
// --------------------

// Interface
TablePipeline.compilePlan(): CompiledPlan

// Example
tableEnv
  .from("T1")
  .insertInto("T2")
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Same as single SQL queries.

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromFile("/my/path")).print();

Statement Sets added to DataStream API

// --------------------
// RESTORE
// --------------------

// Interface
StatementSet.addPlan(CompiledPlan)

// Example
tableEnv.createStatementSet()
  .addPlan(CompiledPlan.fromFile("/my/path"))
  .attachAsDataStream();

Table with DataStream API Sink

// Refactor also the insertInto DataStream methods,
// because otherwise we would need to add a lot of overloading.

StreamTableEnvironment.insertIntoDataStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoDataStream(Table, AbstractDataType): TableStreamPipeline<T>
StreamTableEnvironment.insertIntoChangelogStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema, ChangelogMode): TableStreamPipeline<Row>

TableStreamPipeline.compilePlan(): CompiledPlan
// the naming `asDataStream` is consistent with StreamStatementSet.attachAsDataStream
TableStreamPipeline.asDataStream(): DataStream<T>

// We can also leverage this new API to get a DataStream side output
// within a StreamStatementSet.
// This is future work but has been requested multiple times by users.

// --------------------
// COMPILE
// --------------------

// Interface
TableStreamPipeline.compilePlan(): CompiledPlan

// Example
tableEnv
  .insertIntoDataStream(tableEnv.from("T1"))
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
StreamTableEnvironment.asDataStream(CompiledPlan, Class): DataStream<T>

// Example
tableEnv.asDataStream(CompiledPlan.fromFile("/my/path"), Row.class).map(...).executeAndCollect();


Note: We need to store the expected target data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields. We use a `Class` argument in `asDataStream` to additionally verify the output of the CompiledPlan.

DataStream API Sources

We use a DataStream's uid() to map nodes of CompiledPlan back to a source DataStream.

An exception is thrown if a plan contains input data streams without uid().

DataStream references are always temporary and can be treated similar to temporary views.

CatalogManager can store a mapping from uid -> Transformation in case of a restore. This mapping cleared with `unregisterDataStreams`.

// --------------------
// COMPILE
// --------------------

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1"); // compilation would fail if uid() is not set

tableEnv
  .fromDataStream(stream1)
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface

// We use non-SQL terminology to differentiate from catalog operations.
// Similar to TableEnvironment.registerCatalog().
StreamTableEnvironment.registerDataStream(DataStream<?> dataStreamWithUid)
StreamTableEnvironment.unregisterDataStreams()

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1");

tableEnv.registerDataStream(stream1);

tableEnv.fromPlan(CompiledPlan.fromFile("/my/path")).execute().print();


Note: We need to store the expected source data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields.

Function Versioning

A question was whether we should serialize Expression instead of RexNode and AggregateCall? At this location of the plan, we need to serialize RexNode but that doesn't mean that we need to expose Calcite names, classes, enums, etc. We should define our JSON format without concepts such as Expression or RexNode.

However, the past has shown that functions change frequently both for type inference and runtime behavior.

Also, the current implementation of `RexNodeJsonDeserializer` looks very fragile and does not support modules.

We should store a version with every serialized RexNode. We extend ModuleManager to lookup FunctionDefinitions by name and version. If no version is defined, we take the most recent version.

ModuleManager.getFunctionDefinition(String name, @Nullable Integer version)

`FunctionCatalogOperatorTable` should then be able to convert to versioned functions.

We will introduce a declarative concept to `BuiltInFunctionDefinitions` and `FlinkSqlOperatorTable` that maintain a function name + version to instance mapping.

For functions that are not using `BridgingSqlFunction`, we might introduce a wrapper SqlOperator that returns the original SqlKind but encodes a version as well that can be accessed by code generator.

Note that the internal digest of an expression therefore might change between Flink versions after restoring from a JSON plan. However, the Web UI should be stable since we encode the operator description in the JSON plan.

Example

Let's assume we have the following (possibly overloaded) function definition:

TO_TIMESTAMP_LTZ(BIGINT | [STRING ',' STRING])

The JSON plan should look similar to:

{
  "kind" : "CALL",
  "function" : {
    "name" : "TO_TIMESTAMP_LTZ"
    "version": 1
  }
}

If the latest version is specified, the EXPLAIN plan will look similar to:

== Optimized Physical Plan ==
Calc(select=[TO_TIMESTAMP_LTZ(f0)])


Let's assume we change the runtime behavior of TO_TIMESTAMP_LTZ. The signature might remain unchanged in this case. We create a new function definition with the same name for the new behavior.

We keep the old function definition around under an internal name. The physical plan after restore might look similar to:

== Optimized Physical Plan ==
Calc(select=[$TO_TIMESTAMP_LTZ$1(f0)])

$TO_TIMESTAMP_LTZ$1 has been returned by the ModuleManager.

Topology Versioning

ExecNode

We consider the ExecNode to Operator 1:n relationship in naming of Transformations. 

We define the following naming convention for the uid() of operators for state restore:

<ExecNodeID>_ExecNodeKind-ExecNodeVersion_OperatorKind
13_stream-exec-sink-1_upsert-materializer

ExecNodeKind and ExecNodeVersion uniquely identify the topology structure for a group of operators. An OperatorKind is responsible for ensuring uniqueness among operators. The ExecNodeID is responsible for uniqueness among multiple usages of the same ExecNode.

We use annotations for code maintenance. See next section.

We will not have a version for operators but only ExecNodes.

In future Flink versions, we might support that a new operator can subscribe to multiple uid()'s of previous operators during restore. This would make operator migration easier.

Connectors

For sources and sinks that might be composed of multiple operators, we need to propagate the uid suffix for providers. Currently, this is only necessary for DataStreamScanProvider and DataStreamSinkProvider.

// introduce new context
DataStreamSinkProvider.consumeDataStream(DataStream<RowData> dataStream, ProviderContext): DataStreamSink<?> // introduce new context
DataStreamScanProvider.produceDataStream(StreamExecutionEnvironment execEnv, ProviderContext): DataStream<RowData> // Creates one or more ExecNode-aware UIDs ProviderContext.generateUid(String operatorId): String // for example: // ProviderContext.generateUid("preprocessor-1") // leads to: // "13_stream-exec-sink-1_provider_preprocessor-1"

It is the responsibility of the connector implementer to ensure that the operatorId is unique within a provider.

Testing Infrastructure

ExecNode Tests

The testing infrastructure is crucial for version upgrades. We define three kinds of tests:

We introduce annotations for ExecNodes that can be read by the JSON serializer and tests:

@ExecNodeMetadata(
  // main information
  name = "stream-exec-sink",
  version = 1,
  // maintenance information for us/the community and our tests
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"} producedOperators = {"not-null-enforcer", "upsert-materializer"} minPlanVersion = FlinkVersion.1_15, minStateVersion = FlinkVersion.1_15)

Note: Both persisted entities plan and state can evolve independently from each other which is why both reference a Flink version. A max/end version is not required as we would drop the annotation in this case.

An ExecNode might declare multiple annotations with different versions. See example below.

PropertyDescription
name

For JSON serialization/deserialization and uid() generation. Clearly identifies the ExecNode together with version.

version

For JSON serialization/deserialization and uid() generation. Clearly identifies the ExecNode together with name.

consumedOptions

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

producedOperators

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

minPlanVersion

Used for plan validation and potentially plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

The annotation does not need to be updated for every Flink version. As the name suggests it is about the "minimum" version for a restore. If the minimum version is higher than the current Flink version, plan migration is necessary.

Changing this version will always result in a new ExecNode version.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

minStateVersion

Used for operator and potentially savepoint migration.

Updates whenever the state layout of an ExecNode changes. In some cases, the operator can perform state migration. If the minimum version is higher than the current Flink version, savepoint migration is necessary.

Changing this version will always result in a new ExecNode version.

Restore tests can verify that operator migration works for all Flink state versions.

Completeness tests can verify that restore tests exist for all state variations.

Annotation Example

Operator State Change

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

2. The state layout of A changed slightly in Flink 1.16. The topology is the same but an operator will use a new state layout in the future:

class X extends ProcessFunction {
  ValueState<A> oldStateLayout;
  ValueState<B> newStateLayout;

  open() {
    if (oldStateLayout.get() != null) {
      performOperatorMigration();
    }
    useNewStateLayout();
  }
}

Every state change will increase the ExecNode version. Since the operator supports both state layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2. Operators will perform operator migration under the hood in the meantime.


3. We would like to drop the support of the old state layout and assume that most operator states have been migrated by now. In this example, we assume already Flink 1.17 for dropping the legacy.

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely replace the old version 1 with 2. The JSON plan flinkVersion changes to 1.17.

Plan Change

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The JSON node of ExecNode A gets an additional property in Flink 1.16:

Before:

{
some-prop: 42
}

After:

{
some-prop: 42,
some-flag: false
}

Every plan change will increase the ExecNode version. Since the ExecNode supports both plan layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2.


3. We would like to drop the support of the old plan layout. In this example, we assume already Flink 1.17 for dropping the legacy.

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely insert the new default value for `some-flag`. The JSON plan flinkVersion changes to 1.17.

Topology Change

Note that this is likely the most common case when performing a change.

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The state layout and topology of A changed fundamentally in Flink 1.16.

We will maintain two ExecNode classes with different versions.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNodeVersion1 extends ExecNode {...}

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNodeVersion2 extends ExecNode {...}

If the JSON format has not changed for version 1 and 2, we could keep minPlanVersion=1.15 in version 2. This might be useful for users to replace a faulty ExecNode implementation and accepting loosing state. Otherwise, we can set minPlanVersion=1.16.

Restore Tests

We will come up with a new testing infrastructure that has

for every ExecNode version.

We will not maintain savepoints for every Flink version but only for the Flink version where a new ExecNode version was introduced. We might extend the testing matrix if necessary but initially we try to keep the release overhead low.

We will add one complex end-to-end test for which we test savepoints and JSON plans of different versions. During cutting a feature branch, we can regenerate a JSON plan and savepoint, similar to our serializer upgrade tests.

Once we have more experience with ExecNodes that have been changed, we might reorganize the ExecNode structure in dedicated packages or modules. Since the JSON plan and savepoint should be independent of concrete classes, this can be future work.

Change Detection Tests

We will generate and check-in the JSON plan for the current ExecNode versions in master. This happens already today, it should help us detect plan changes for which we need to find alternatives.

The change detection tests are derived from the restore tests by compiling the given pipeline definition in the most recent ExecNode version.

Completeness Tests

Completeness tests should help contributors not to forget to add tests when performing a code change.

We will have completeness tests that verify that every ExecNode version has corresponding tests via classpath scanning and annotation checking.

Connector / Format Tests

We can also use the testing infrastructure for connectors and formats. Esp. connector that use DataStreamSourceProvider and DataStreamSinkProvider need tests for correctly set uid(). This can be done via Completeness Tests.

Function Tests

For functions, we don't need annotations. All built-in functions are either declared in BuiltInFunctionDefinitions or FlinkSqlOperatorTable and we can use reflections for completeness tests, change detection tests, and restore tests.

We can extend BuiltInFunctionTestBase to test different versions.

Currently, not all functions are ported to this test base. We can at least port one parameter combination of every function to it.

We can additionally compute a hash from the explain string of input and output type inference to detect changes in the function definition. Runtime changes should be visible via the tests of BuiltInFunctionTestBase.

Code Change Guidelines

We will formulate guidelines to help contributors when a change requires a new ExecNode or function version.

However, this is not part of this FLIP and will happen once the completeness tests and other checks are in place. E.g. "if test XYZ fails, a new ExecNode version needs to be added".

Examples for Design Verification

Runtime utility has been moved to a different package
https://github.com/apache/flink/pull/17454
→ No migration necessary. Refactored utility will be used by all versions of ExecNodes.

Source ability interface has been extended by an additional parameter
https://github.com/apache/flink/pull/17662
→ Plan migration necessary. We would have needed to remove metadata columns from the projection type.
→ Alternatively, this could also have been done by the `ProjectPushDownSpec`.

Coalesce has been reworked and introduces rules
https://github.com/apache/flink/pull/17256
→ No migration necessary because the old plan is still runnable even though it might be incorrect.

Changing the semantics of CAST
https://issues.apache.org/jira/browse/FLINK-24403
→ Either we version this function or (more likely) we introduce a hard config option to restore the old behavior. Since the old behavior was invalid, we could also argue that the change is incompatible with previous versions.

Additional information is passed to ExecNode for incremental aggregation with distinct and count star
https://github.com/apache/flink/pull/16759
→ ExecNode version upgrade is not necessary. But old JSON plans will produce an invalid result due to missing information.

Typo change in LogicalType and thus change in distinct views
https://github.com/apache/flink/pull/16020 
→ Should not be a problem if distinct views don't work with Java serialization anymore.
→ Bug in StructuredType was forwarded to JSON and could have been avoided.

Preserve function identifier during filter pushdown
https://github.com/apache/flink/pull/16396 
→ Change in JSON plan could have been avoided if better function JSON representation.

Additional parameter for window table functions
https://github.com/apache/flink/pull/16215
→ Optional parameter for ExecNode. No new ExecNode version necessary.

Additional parameter for sink nodes
https://github.com/apache/flink/pull/17699

Compatibility, Deprecation, and Migration Plan

Compatibility is not affected.

We deprecate:

In the long-term, we might deprecate the methods that can be expressed with:

Rejected Alternatives

Configure per CompiledPlan

Pro: My experience is that user often failed to figure out where they should put the config, they get lost among TableConfig, ExecutionConfig#GlobalJobParameters,StreamExecutionEnvironment#config. So I think it is better that we can only set them at a  single place.

Con: My experience is that user often failed to figure out where they should put the config, they get lost among TableConfig, ExecutionConfig#GlobalJobParameters,StreamExecutionEnvironment#config. So I think it is better that we can only set them at a  single place.

// configure

// define options such as `table.plan.compile.catalog-function`
// per plan/per compilation
CompiledPlan.withOptions(ReadableConfig): CompiledPlan

CompiledPlan.getOptions(): ReadableConfig

COMPILE PLAN OVERWRITE

Pro: We should declare that the file of plan json should not be overwritten, and provide `COMPILE PLAN OVERWRITE '/mydir/plan.json' INSERT ...` to let users compile plan multiple times. Just like "INSERT INTO" and "INSERT OVERWRITE", "INSERT INTO" works in most scenarios.

Con: We decided against OVERWRITE and for a global option `table.plan.restore.force-recompile`. OVERWRITE is not useful in production, only for development/debugging purposes, right? It would be dangerous to accidentally leave this keyword in a SQL script.

Having an option could disable this for production code and removing it disables it for all compilation steps. No need to remove OVERWRITE from every COMPILE statement. It could actually simplify the development process. If OVERWRITE is really needed in the future, we can extend the syntax then. So I'm fine with the current design.

COMPILE PLAN OVERWRITE '/mydir/plan.json' INSERT ...