1. Motivation

The ALTER MATERIALIZED TABLE... AS <select_statement> command provides a powerful mechanism to evolve a table's definition. However, its current behavior is rigid: it always stops the current job and starts a new one from the beginning of the source data, discarding all previous state.

While a full reprocess is sometimes unavoidable, specifically when the query optimizer generates a completely new physical plan, forcing this behavior for every evolution is inefficient and costly. For many common evolutions, such as adding a nullable column or making compatible logic changes, re-ingesting historical data is unnecessary.

Users require granular control over this process to optimize for cost, recovery speed, and data correctness. This FLIP proposes enhancing CREATE [OR ALTER] and ALTER MATERIALIZED TABLE by introducing two distinct clauses: START_MODE and STATE_RETENTION. This decouples the concern of what data to process from how to manage existing state, making table evolution explicit, configurable, and efficient.

2. Proposed Changes

We propose introducing two optional clauses to the CREATE [OR ALTER] MATERIALIZED TABLE... AS <select_statement> and the ALTER MATERIALIZED TABLE... AS <select_statement> command: START_MODE and STATE_RETENTION.

2.1. Core Design Principle: Decoupling Data Scope from State Management

The central principle of this proposal is the separation of two distinct user intents:

  1. Data Processing Window (START_MODE): This clause defines the point in the source stream(s) from which the new job should begin processing data. It answers the question, "What data should the new logic be applied to?" and avoids unnecessary costs for historical data that is not needed anymore.

  2. State Management (STATE_RETENTION): This clause defines how the Flink job's existing state (e.g., window aggregates, join state) should be handled during the evolution. It answers the question, "What memory from the old job should the new job have?". We are aware that this happens on a best effort basis that we will improve over time.

By separating these concerns, we provide users with unambiguous control and prevent logically inconsistent operations.

2.2. New SQL Syntax

The CREATE [OR ALTER] MATERIALIZED TABLE command will be enhanced as follows:

CREATE [OR ALTER] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[( { <schema_definition> | <column_list> } )]
[WITH (...)]
[START_MODE = FROM_BEGINNING 
             | FROM_NOW[(<interval_expression>)] 
             | FROM_TIMESTAMP(<timestamp_literal>)
             | RESUME_OR_FROM_BEGINNING 
             | RESUME_OR_FROM_NOW[(<interval_expression>)] 
             | RESUME_OR_FROM_TIMESTAMP(<timestamp_literal>)]
[STATE_RETENTION = ALL | NONE | PTF_ONLY] -- A no-op on the CREATE path
AS <select_statement>

For consistency between the CREATE and the CREATE OR ALTER syntax theSTATE_RETENTION is a no-op on the CREATE path of a CREATE OR ALTER statement, as no prior state exists to retain or drop. It is only evaluated on the ALTER path where state exists.

The ALTER MATERIALIZED TABLE command will be enhanced as follows:

-- Explicit alteration of the definition query with reprocessing control
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[WITH (...)]
[SET START_MODE = FROM_BEGINNING 
             | FROM_NOW[(<interval_expression>)] 
             | FROM_TIMESTAMP(<timestamp_literal>)
             | RESUME_OR_FROM_BEGINNING 
             | RESUME_OR_FROM_NOW[(<interval_expression>)] 
             | RESUME_OR_FROM_TIMESTAMP(<timestamp_literal>)]
[SET STATE_RETENTION = ALL | NONE | PTF_ONLY]
AS <select_statement>

2.3. START_MODE Clause

This clause controls the data processing window for the evolution.

START_MODE

Description & Fallback Logic

FROM_BEGINNING

Reprocesses all available data from the source(s), starting from the beginning of the history. (Default when not set).

FROM_NOW[(interval)]

tarts processing data relative to the current execution time.

Without argument: FROM_NOW starts consuming from the latest record (the tip of the stream).

With argument: FROM_NOW(INTERVAL '7' DAY) starts processing from a point in the past calculated by subtracting the standard SQL interval from the current time.

FROM_TIMESTAMP(<timestamp_literal>)

Starts processing data from the specified absolute instant in time.


Syntax: FROM_TIMESTAMP(TIMESTAMP '2025-10-28 12:00:00')

RESUME_OR_FROM_BEGINNING

Special idempotent case for CREATE OR ALTER. Attempts to resume processing from the exact source offsets where the previous job instance stopped.

Fallback: If resume information is unavailable, the job falls back to FROM_BEGINNING.

RESUME_OR_FROM_NOW[(interval)]

Special idempotent case for CREATE OR ALTER. Attempts to resume processing from the exact source offsets where the previous job instance stopped.

Fallback: If resume information is unavailable, the job falls back to the specified FROM_NOW logic (including the optional interval offset if provided).

RESUME_OR_FROM_TIMESTAMP(<timestamp_literal>)

Special idempotent case for CREATE OR ALTER. Attempts to resume processing from the exact source offsets where the previous job instance stopped.

Fallback: If resume information is unavailable, the job falls back to the specified FROM_TIMESTAMP expression.

Implementation Note on Time Expressions and RESUME_OR_...: This capability relies on the Catalog and Connector implementation. Timestamp and Interval expressions are resolved by the planner into a standard long timestamp (milliseconds since 1970-01-01 00:00:00 UTC) before being passed to the Catalog/Connector layer, aligning with the existing Catalog.getTable(ObjectPath, long) API for temporal operations.

2.4. STATE_RETENTION Clause

This clause controls how the state of a stateful pipeline is managed across the evolution.

  • ALL: All state from the previous job's savepoint is retained and loaded into the new job. This is essential for stateful operations to continue functioning correctly with the new logic on new data. It is semantically equal to bin/flink run -s <savepoint>. This clause will have limited functionality initially. Because UIDs in the compiled plan will change and cause an error during restore for stateful operations. A stateless pipeline should succeed.


  • NONE: All state from the previous job is discarded, which is the equivalent of full reprocessing.. The new job starts with a clean state. This is necessary for any operation that reprocesses historical data to ensure correctness. It is semantically equal to bin/flink run.


  • PTF_ONLY: This advanced mode would allow users to specify the names or IDs of specific Process Table Functions (PTFs) or operators whose state should be retained, while the state for all other operators is discarded. This would enable highly surgical state migrations similar to DataStream API. It is semantically equal to bin/flink run -s <savepoint> --allowNonRestoredState.

2.5. Defaults and Interaction

To maintain backward compatibility and ensure data correctness by default, the clauses have simple, explicit defaults.

Defaults:

  • START_MODE Default: If START_MODE is not specified, it defaults to RESUME_OR_FROM_BEGINNING.

  • STATE_RETENTION Default: If STATE_RETENTION is not specified, it defaults to NONE.

This is crucial for backward compatibility. It ensures that any ALTER command run without these new clauses will, by default, drop all existing state and reprocess the entire history from the source. This exactly mimics the original, fixed behavior of the ALTER command.

2.6. Idempotency and SHOW CREATE

To address the non-idempotent nature of relative intervals, the SHOW CREATE [OR ALTER] MATERIALIZED TABLE command will render the user's original expression but include a comment with the timestamp resolved at execution time.

Example SHOW CREATE at submission:

CREATE OR ALTER MATERIALIZED TABLE filtered_high_value_orders
...
START_MODE = FROM_NOW(INTERVAL '7' DAY) /* Evaluated to FROM_TIMESTAMP(TIMESTAMP '2025-10-28 10:00:00') at execution */
AS SELECT...

3. Public Interfaces

To support the persistence of the new evolution clauses, the primary public interface to be updated is org.apache.flink.table.catalog.CatalogMaterializedTable.

The new API will rely on two new public, immutable classes, StartMode and StateRetention, and their associated enums (i.e., StartModeType, StateRetentionType), which would be added to the org.apache.flink.table.catalog package.

The new methods will be added as default methods returning Optional.empty() to ensure full backward compatibility with any existing custom catalog implementations.


public interface CatalogMaterializedTable extends CatalogBaseTable {
    //...
  
    /**
     * Returns the structured object for the {@code START_MODE} clause.
     *
     * <p>This defines the point in the source stream(s) from which a new job should begin
     * processing data upon creation or evolution.
     *
     * <p>If {@link Optional#empty()}, the system default ({@code FROM_BEGINNING}) will be
     * used by the planner.
     *
     * @return The structured StartMode object.
     */
    default Optional<StartMode> getStartMode() {
      return Optional.empty();
    }

    /**
     * Returns the structured object for the {@code STATE_RETENTION} clause.
     *
     * <p>This defines how the Flink job's existing state should be handled during an evolution.
     *
     * <p>If {@link Optional#empty()}, the system default ({@code NONE}) will be used by the
     * planner.
     *
     * @return The structured StateRetention object.
     */
    default Optional<StateRetention> getStateRetention() {
      return Optional.empty();
    }

    //...
}

4. User Journeys

The following table outlines the most common user journeys and how the START_MODE and STATE_RETENTION clauses are combined to achieve them.

Note on STATE_RETENTION = ALL: This option is excluded from standard user journeys because Flink SQL does not currently guarantee stable UIDs across query evolutions. As a result, using ALL will likely lead to state restoration failures for stateful pipelines. It is currently suitable only for stateless pipelines or strictly compatible topology changes.

User Journey (Intent)

START_MODE

STATE_RETENTION

Outcome & Use Case

Full Reprocessing (Default)

[Not Set] (Defaults to RESUME_OR_FROM_BEGINNING)

[Not Set] (Defaults to NONE)

Outcome: All state is discarded. A new job tries to resumes from the last processed data or starts from beginning.


Use Case: Fits to all use cases where historical data is available and reprocessing is cheap.

Reprocess from scratch and discard state

FROM_BEGINNING

NONE

Outcome: All state is discarded. A new job starts from the earliest available source data.


Use Case: Fixing a bug in core logic that must be applied to all historical data. This matches the original, backward-compatible ALTER behavior.

Reprocessing from "Now" and discard state

FROM_NOW

NONE

Outcome: All state is discarded. The job starts processing from the tip of the stream.


Use Case: Creating a new materialized table that should only contain new data, ignoring all history.

Partial Reprocessing (Time-Bound Bug)

FROM_TIMESTAMP(...)

or

FROM_NOW(...)

NONE

Outcome: All state is discarded. A new job starts processing from the specified timestamp.


Use Case: A bug was introduced on a specific date. This is far more efficient than reprocessing the entire history.

Efficient Schema Change

RESUME_OR...

NONE

Outcome: Resumes from offsets, discards operator state.


Use Case: Adding columns or logic where history is not needed.

Advanced State Migration

RESUME_OR...

PTF_ONLY

Outcome: Resumes from offsets and loads only the state for the specified operators.


Use Case: A complex evolution where one part of the state (e.g., a join PTF) is valid, but another (e.g., a window) must be rebuilt.

5. Compatibility, Deprecation, and Migration Plan

This proposal is fully backward compatible. The START_MODE and STATE_RETENTION clauses are optional.

If omitted, they default to START_MODE = FROM_BEGINNING and STATE_RETENTION = NONE. This exactly matches the existing, unmodified behavior of the ALTER command.

No migration is needed for existing materialized tables or user scripts. Existing ALTER scripts will continue to function as they always have.

6. Rejected Alternatives

  • Single REPROCESS Clause: This was rejected because it conflated the separate concerns of data scope and state management, leading to ambiguity (e.g., REPROCESS FROM_TIMESTAMP did not have a clear state handling policy). The two-clause design is more explicit and powerful.

  • KEEP_STATE/DROP_STATE Keywords: The brainstorming idea of simple KEEP STATE / DROP STATE keywords was evolved into the more comprehensive STATE_RETENTION clause, which provides a clear namespace and allows for future extension (e.g., PTF_ONLY).

7. Test Plan

The implementation will be validated with unit and integration tests covering:

  • Parser: Ensure the queries is parsed correctly both with and without the new clause.

  • Planner/Execution:

    • Verify that a materialized table created or altered follows the described behavior above

  • End-to-End Tests: Add tests to validate the user journeys described above.