DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
1. Motivation
The ALTER MATERIALIZED TABLE... AS <select_statement> command provides a powerful mechanism to evolve a table's definition. However, its current behavior is rigid: it always stops the current job and starts a new one from the beginning of the source data, discarding all previous state.
While a full reprocess is sometimes unavoidable, specifically when the query optimizer generates a completely new physical plan, forcing this behavior for every evolution is inefficient and costly. For many common evolutions, such as adding a nullable column or making compatible logic changes, re-ingesting historical data is unnecessary.
Users require granular control over this process to optimize for cost, recovery speed, and data correctness. This FLIP proposes enhancing CREATE [OR ALTER] and ALTER MATERIALIZED TABLE by introducing two distinct clauses: START_MODE and STATE_RETENTION. This decouples the concern of what data to process from how to manage existing state, making table evolution explicit, configurable, and efficient.
2. Proposed Changes
We propose introducing two optional clauses to the CREATE [OR ALTER] MATERIALIZED TABLE... AS <select_statement> and the ALTER MATERIALIZED TABLE... AS <select_statement> command: START_MODE and STATE_RETENTION.
2.1. Core Design Principle: Decoupling Data Scope from State Management
The central principle of this proposal is the separation of two distinct user intents:
Data Processing Window (
START_MODE): This clause defines the point in the source stream(s) from which the new job should begin processing data. It answers the question, "What data should the new logic be applied to?" and avoids unnecessary costs for historical data that is not needed anymore.State Management (
STATE_RETENTION): This clause defines how the Flink job's existing state (e.g., window aggregates, join state) should be handled during the evolution. It answers the question, "What memory from the old job should the new job have?". We are aware that this happens on a best effort basis that we will improve over time.
By separating these concerns, we provide users with unambiguous control and prevent logically inconsistent operations.
2.2. New SQL Syntax
The CREATE [OR ALTER] MATERIALIZED TABLE command will be enhanced as follows:
CREATE [OR ALTER] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[( { <schema_definition> | <column_list> } )]
[WITH (...)]
[START_MODE = FROM_BEGINNING
| FROM_NOW[(<interval_expression>)]
| FROM_TIMESTAMP(<timestamp_literal>)
| RESUME_OR_FROM_BEGINNING
| RESUME_OR_FROM_NOW[(<interval_expression>)]
| RESUME_OR_FROM_TIMESTAMP(<timestamp_literal>)]
[STATE_RETENTION = ALL | NONE | PTF_ONLY] -- A no-op on the CREATE path
AS <select_statement>
For consistency between the CREATE and the CREATE OR ALTER syntax theSTATE_RETENTION is a no-op on the CREATE path of a CREATE OR ALTER statement, as no prior state exists to retain or drop. It is only evaluated on the ALTER path where state exists.
The ALTER MATERIALIZED TABLE command will be enhanced as follows:
-- Explicit alteration of the definition query with reprocessing control
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[WITH (...)]
[SET START_MODE = FROM_BEGINNING
| FROM_NOW[(<interval_expression>)]
| FROM_TIMESTAMP(<timestamp_literal>)
| RESUME_OR_FROM_BEGINNING
| RESUME_OR_FROM_NOW[(<interval_expression>)]
| RESUME_OR_FROM_TIMESTAMP(<timestamp_literal>)]
[SET STATE_RETENTION = ALL | NONE | PTF_ONLY]
AS <select_statement>
2.3. START_MODE Clause
This clause controls the data processing window for the evolution.
START_MODE | Description & Fallback Logic |
|---|---|
| Reprocesses all available data from the source(s), starting from the beginning of the history. (Default when not set). |
| tarts processing data relative to the current execution time. Without argument: With argument: |
| Starts processing data from the specified absolute instant in time. Syntax: |
| Special idempotent case for Fallback: If resume information is unavailable, the job falls back to |
| Special idempotent case for Fallback: If resume information is unavailable, the job falls back to the specified |
| Special idempotent case for Fallback: If resume information is unavailable, the job falls back to the specified |
Implementation Note on Time Expressions and RESUME_OR_...: This capability relies on the Catalog and Connector implementation. Timestamp and Interval expressions are resolved by the planner into a standard long timestamp (milliseconds since 1970-01-01 00:00:00 UTC) before being passed to the Catalog/Connector layer, aligning with the existing Catalog.getTable(ObjectPath, long) API for temporal operations.
2.4. STATE_RETENTION Clause
This clause controls how the state of a stateful pipeline is managed across the evolution.
ALL: All state from the previous job's savepoint is retained and loaded into the new job. This is essential for stateful operations to continue functioning correctly with the new logic on new data. It is semantically equal tobin/flink run -s <savepoint>. This clause will have limited functionality initially. Because UIDs in the compiled plan will change and cause an error during restore for stateful operations. A stateless pipeline should succeed.
NONE: All state from the previous job is discarded, which is the equivalent of full reprocessing.. The new job starts with a clean state. This is necessary for any operation that reprocesses historical data to ensure correctness. It is semantically equal tobin/flink run.
PTF_ONLY: This advanced mode would allow users to specify the names or IDs of specific Process Table Functions (PTFs) or operators whose state should be retained, while the state for all other operators is discarded. This would enable highly surgical state migrations similar to DataStream API. It is semantically equal tobin/flink run -s <savepoint> --allowNonRestoredState.
2.5. Defaults and Interaction
To maintain backward compatibility and ensure data correctness by default, the clauses have simple, explicit defaults.
Defaults:
START_MODEDefault: IfSTART_MODEis not specified, it defaults toRESUME_OR_FROM_BEGINNING.STATE_RETENTIONDefault: IfSTATE_RETENTIONis not specified, it defaults toNONE.
This is crucial for backward compatibility. It ensures that any ALTER command run without these new clauses will, by default, drop all existing state and reprocess the entire history from the source. This exactly mimics the original, fixed behavior of the ALTER command.
2.6. Idempotency and SHOW CREATE
To address the non-idempotent nature of relative intervals, the SHOW CREATE [OR ALTER] MATERIALIZED TABLE command will render the user's original expression but include a comment with the timestamp resolved at execution time.
Example SHOW CREATE at submission:
CREATE OR ALTER MATERIALIZED TABLE filtered_high_value_orders ... START_MODE = FROM_NOW(INTERVAL '7' DAY) /* Evaluated to FROM_TIMESTAMP(TIMESTAMP '2025-10-28 10:00:00') at execution */ AS SELECT...
3. Public Interfaces
To support the persistence of the new evolution clauses, the primary public interface to be updated is org.apache.flink.table.catalog.CatalogMaterializedTable.
The new API will rely on two new public, immutable classes, StartMode and StateRetention, and their associated enums (i.e., StartModeType, StateRetentionType), which would be added to the org.apache.flink.table.catalog package.
The new methods will be added as default methods returning Optional.empty() to ensure full backward compatibility with any existing custom catalog implementations.
public interface CatalogMaterializedTable extends CatalogBaseTable {
//...
/**
* Returns the structured object for the {@code START_MODE} clause.
*
* <p>This defines the point in the source stream(s) from which a new job should begin
* processing data upon creation or evolution.
*
* <p>If {@link Optional#empty()}, the system default ({@code FROM_BEGINNING}) will be
* used by the planner.
*
* @return The structured StartMode object.
*/
default Optional<StartMode> getStartMode() {
return Optional.empty();
}
/**
* Returns the structured object for the {@code STATE_RETENTION} clause.
*
* <p>This defines how the Flink job's existing state should be handled during an evolution.
*
* <p>If {@link Optional#empty()}, the system default ({@code NONE}) will be used by the
* planner.
*
* @return The structured StateRetention object.
*/
default Optional<StateRetention> getStateRetention() {
return Optional.empty();
}
//...
}
4. User Journeys
The following table outlines the most common user journeys and how the START_MODE and STATE_RETENTION clauses are combined to achieve them.
Note on STATE_RETENTION = ALL: This option is excluded from standard user journeys because Flink SQL does not currently guarantee stable UIDs across query evolutions. As a result, using ALL will likely lead to state restoration failures for stateful pipelines. It is currently suitable only for stateless pipelines or strictly compatible topology changes.
User Journey (Intent) |
|
| Outcome & Use Case |
|---|---|---|---|
Full Reprocessing (Default) |
|
| Outcome: All state is discarded. A new job tries to resumes from the last processed data or starts from beginning. Use Case: Fits to all use cases where historical data is available and reprocessing is cheap. |
Reprocess from scratch and discard state |
|
| Outcome: All state is discarded. A new job starts from the earliest available source data. Use Case: Fixing a bug in core logic that must be applied to all historical data. This matches the original, backward-compatible |
Reprocessing from "Now" and discard state |
|
| Outcome: All state is discarded. The job starts processing from the tip of the stream. Use Case: Creating a new materialized table that should only contain new data, ignoring all history. |
Partial Reprocessing (Time-Bound Bug) |
or
|
| Outcome: All state is discarded. A new job starts processing from the specified timestamp. Use Case: A bug was introduced on a specific date. This is far more efficient than reprocessing the entire history. |
Efficient Schema Change |
|
| Outcome: Resumes from offsets, discards operator state. Use Case: Adding columns or logic where history is not needed. |
Advanced State Migration |
|
| Outcome: Resumes from offsets and loads only the state for the specified operators. Use Case: A complex evolution where one part of the state (e.g., a join PTF) is valid, but another (e.g., a window) must be rebuilt. |
5. Compatibility, Deprecation, and Migration Plan
This proposal is fully backward compatible. The START_MODE and STATE_RETENTION clauses are optional.
If omitted, they default to START_MODE = FROM_BEGINNING and STATE_RETENTION = NONE. This exactly matches the existing, unmodified behavior of the ALTER command.
No migration is needed for existing materialized tables or user scripts. Existing ALTER scripts will continue to function as they always have.
6. Rejected Alternatives
Single
REPROCESSClause: This was rejected because it conflated the separate concerns of data scope and state management, leading to ambiguity (e.g.,REPROCESS FROM_TIMESTAMPdid not have a clear state handling policy). The two-clause design is more explicit and powerful.KEEP_STATE/DROP_STATEKeywords: The brainstorming idea of simpleKEEP STATE/DROP STATEkeywords was evolved into the more comprehensiveSTATE_RETENTIONclause, which provides a clear namespace and allows for future extension (e.g.,PTF_ONLY).
7. Test Plan
The implementation will be validated with unit and integration tests covering:
Parser: Ensure the queries is parsed correctly both with and without the
newclause.Planner/Execution:
Verify that a materialized table created or altered follows the described behavior above
End-to-End Tests: Add tests to validate the user journeys described above.