DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
1. Motivation
The introduction of Materialized Tables in FLIP-435 was a significant step toward simplifying data pipelines. However, as users adopt this feature in production environments, especially those using automation, a key challenge arises.
In modern data operations, pipelines are often managed through declarative tools like dbt, Terraform, or custom orchestration frameworks. These tools need to manage the lifecycle of data assets in a predictable, declarative manner. The current CREATE MATERIALIZED TABLE syntax presents a significant barrier to building such integrations.
While CREATE IF NOT EXISTS would solve the immediate failure problem, it only addresses table creation, not evolution. Modern data tooling requires the ability to both create new tables and update existing ones when definitions change. CREATE IF NOT EXISTS would only solve the first requirement, leaving tools to implement complex ALTER logic with error handling still. If the table already exists, the command fails, forcing tool developers to implement complex logic with DROP...IF EXISTS pattern, which is risky, can lead to data loss and complicates the development of reliable data management tools.
This FLIP proposes introducing a CREATE OR ALTER MATERIALIZED TABLE command to provide a native, idempotent way to manage materialized tables, aligning Flink with best practices for declarative, infrastructure-as-code deployments. This pattern is a standard feature in other modern data platforms like Snowflake. This command will unify the existing CREATE and ALTER logic into a single, robust statement.
While the CREATE OR ALTER pattern would also be beneficial for other Flink catalog objects such as views, tables, and functions, the scope of this FLIP is strictly limited to Materialized Tables. Future FLIPs may address these other objects.
2. Proposed Changes
To support idempotent deployments, we will introduce the CREATE OR ALTER MATERIALIZED TABLE syntax. This command functions as a wrapper around the existing CREATE and ALTER logic defined in FLIP-435.
2.1. Core Behavior
If the materialized table does not exist, this command behaves identically to CREATE MATERIALIZED TABLE, creating a new table with all specified properties..
If the materialized table already exists, the command detects changes across all aspects of the table definition and applies the necessary ALTER operations:
The definition query (
AS <select_statement>)The freshness interval
The refresh mode
Table properties in the
WITHclausePartitioning, comments, and constraints
This ensures that running the same script multiple times will result in the same table state without unnecessary operations or errors.
2.2. Scope: Declarative Definition vs. Imperative Control
The scope of CREATE OR ALTER is intentionally aligned with CREATE MATERIALIZED TABLE to maintain a clear distinction between declarative definition and imperative control. The command is used to define the desired state of a materialized table, not to manage its runtime execution. It does not support imperative runtime operations like SUSPEND, RESUME, or REFRESH, which remain exclusive to the standard ALTER command to maintain a clear separation between defining a table's state and controlling its execution.
3. Public Interfaces
The proposed change is primarily at the SQL parser and planner level.
3.1. New SQL Syntax
The following DDL statement will be added:
-- Idempotent creation and evolution
CREATE OR ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[ ([ <table_constraint> ]) ]
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[WITH (key1=val1, key2=val2, ...)]
FRESHNESS = INTERVAL '<num>' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<distribution>:
{
DISTRIBUTED BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
| DISTRIBUTED INTO n BUCKETS
}
3.2. Provide original query
For more helpful logging, we will extend the CatalogMaterializedTable interface to align with the existing CatalogView interface:
@PublicEvolving
public interface CatalogMaterializedTable extends CatalogBaseTable {
/**
* Original text of the materialized table definition that preserves the original formatting.
* @return the original string literal provided by the user.
*/
String getOriginalQuery();
/**
* Expanded text of the original materialized table definition with resolved identifiers.
* This is needed because context such as current DB is lost after the session.
* @return the materialized table definition in expanded text.
*/
String getExpandedQuery();
/**
* @deprecated Use {@link #getExpandedQuery()} instead.
*/
@Deprecated
default String getDefinitionQuery() {
return getExpandedQuery();
}
}
4. User Journeys
4.1. Idempotent Deployment in a CI/CD Pipeline
A developer defines a materialized table in a SQL file that is managed in a Git repository and applied via a CI/CD pipeline.
4.1.1. Create Materialized Table
Step 1: First Deployment
-- file: materialized_tables/high_value_orders.sql CREATE OR ALTER MATERIALIZED TABLE high_value_orders FRESHNESS = INTERVAL '5' MINUTE AS SELECT user_id, product_id, price FROM orders WHERE price > 1000;
Outcome: The materialized table high_value_orders is created and its background refresh job starts, just as if CREATE MATERIALIZED TABLE had been used.
Step 2: Subsequent Deployments (No Changes)
A commit is made to a different file, triggering the CI/CD pipeline again. The same high_value_orders.sql script is executed with no changes.
Outcome: The command succeeds without error. Flink recognizes that the desired state matches the current state, and no action is taken. This demonstrates idempotency.
4.2. Evolving a Query
A developer needs to add a new nullable column, which is supported by existing ALTER logic.
4.2.1. Supported Evolution - Adding Nullable Columns
-- Add a nullable description column
CREATE OR ALTER MATERIALIZED TABLE high_value_orders
FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT
user_id,
product_id,
price,
description -- New nullable column
FROM orders
WHERE price > 1000;
Outcome: Flink detects the query change by comparing the original user query. This will follow the existing ALTER MATERIALIZED TABLE … AS <select_statement> logic. As the documentation describes, schema evolution currently only supports adding nullable columns to the end of the original table’s schema. This means that Flink suspends the current job with savepoint, updates the schema, and starts a new job from the beginning.
4.2.2. Supported Evolution - Changing Business Logic
A developer modifies the filtering logic to include mid-value orders.
-- Change the filter condition
CREATE OR ALTER MATERIALIZED TABLE high_value_orders
FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT
user_id,
product_id,
price,
description
FROM orders
WHERE price > 500; -- Changed threshold from 1000 to 500
Outcome: Business logic changes are supported. The job restarts from the beginning to reprocess historical data with the new filter.
4.2.3. Unsupported Evolution - Dropping Columns
A developer attempts to remove the description column.
-- Attempt to remove description column
CREATE OR ALTER MATERIALIZED TABLE high_value_orders
FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT
user_id,
product_id,
price
-- description column removed
FROM orders
WHERE price > 500;
Outcome: ValidationException: "Failed to modify query because drop column is unsupported. When modifying a query, you can only append new columns at the end of original schema. The original schema has 4 columns, but the newly derived schema from the query has 3 columns."
4.2.4. Unsupported Evolution - Reordering Columns
A developer attempts to reorder columns.
-- Attempt to reorder columns
CREATE OR ALTER MATERIALIZED TABLE high_value_orders
FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT
user_id,
price, -- Swapped position with product_id
product_id, -- Swapped position with price
description
FROM orders
WHERE price > 500;
Outcome: ValidationException: "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported. Column mismatch at position 1: Original column is [`product_id` INT], but new column is [`price` DECIMAL(10,2)]."
4.2.5. Unsupported Evolution - Changing Column Types
A developer attempts to change the data type of an existing column.
-- Attempt to change price to integer
CREATE OR ALTER MATERIALIZED TABLE high_value_orders
FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT
user_id,
product_id,
CAST(price AS INT) as price, -- Changed type from DECIMAL to INT
description
FROM orders
WHERE price > 500;
Outcome: ValidationException: "Column mismatch at position 2: Original column is [`price` DECIMAL(10,2)], but new column is [`price` INT]."
5. Compatibility, Deprecation, and Migration Plan
Backward Compatibility: This proposal is fully backward compatible. It introduces a new command, and all existing DDL statements for materialized tables continue to function as before.
Migration: No migration is needed for existing materialized tables.
6. Test Plan
The implementation will be validated with unit and integration tests covering:
CREATE OR ALTERrouting logic to ensure it callsCREATEfor new tables and the appropriateALTERlogic for existing ones.End-to-end tests simulating the idempotent deployment and property change user journeys.
7. Rejected Alternatives
7.1. CREATE OR REPLACE MATERIALIZED TABLE
We considered using the CREATE OR REPLACE syntax (introduced in FLIP-492), which is common in other SQL systems like PostgreSQL (for views/functions), Oracle, Snowflake, and BigQuery.
Rejected for the following reasons:
Consistency with Existing Flink Commands: Materialized Tables already provide two distinct commands:
CREATE MATERIALIZED TABLE ... AS <select_statement>- Creates a new tableALTER MATERIALIZED TABLE ... AS <select_statement>- Modifies an existing table
The CREATE OR ALTER (popularized by Microsoft SQL Server and is a key feature in Snowflake) syntax naturally combines these two existing commands, maintaining conceptual consistency with Flink's current DDL patterns. Introducing CREATE OR REPLACE would create a third distinct semantic that doesn't align with the existing command structure. A review shows that neither CREATE OR REPLACE nor CREATE OR ALTER are part of the formal ANSI/ISO SQL standard. They are both widely adopted, vendor-specific extensions.
Clear Semantic Distinction:
CREATE OR REPLACE typically implies destructive "drop and recreate" semantics in most SQL systems.
CREATE OR ALTER clearly indicates "create if missing, otherwise modify existing" semantics.
For a complex, stateful object like a Flink Materialized Table with a running job, the "drop and recreate" implication of REPLACE is both misleading and undesirable.
While the full implementation of state-preserving, in-place pipeline modifications is considered future work, our long-term goal is to evolve, not destroy, these pipelines. The CREATE OR ALTER syntax is the correct foundational choice because it clearly signals this non-destructive, "modify existing" intent. It sets the right semantic expectation for users and provides a more resilient command structure that can accommodate more advanced evolution capabilities in the future.
User Confusion Prevention: Since Flink users are already familiar with ALTER MATERIALIZED TABLE ... AS <select_statement> for query changes, introducing CREATE OR REPLACE could create confusion about when to use which command. CREATE OR ALTER makes the relationship explicit.
Implementation Clarity: The CREATE OR ALTER syntax maps directly to the existing implementation paths:
If table doesn't exist → delegate to existing CREATE logic
If table exists → delegate to existing ALTER logic
Future Resilience: The ALTER semantic is more forward-looking. As Flink's evolution capabilities become more sophisticated—potentially including complex reprocessing strategies that retain historical data—the concept of 'altering' a pipeline is more fitting than 'replacing' it.
This makes the implementation more straightforward and less error-prone.
7.2. Extending Existing CREATE MATERIALIZED TABLE with IF EXISTS
IF EXISTStypically means "don't fail if exists," not "modify if exists"Would break SQL conventions where
CREATE MATERIALIZED TABLE ... IF NOT EXISTSskips creation if the object existsLess clear about the intended behavior compared to
CREATE OR ALTER