Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-435 [1], we introduced the concept of the Materialized Table to streamline the data development process. By specifying query logic and data freshness, users can efficiently build data pipelines, significantly enhancing development productivity.

However, in production environments, modifying data processing logic is a frequent necessity. To better adapt to production changes, we aim to enhance Materialized Table support, allowing users to easily modify their query logic. This ensures flexibility in responding to business adjustments while maintaining stability and manageability of data pipelines.

User Story

Typical Scenarios and Modification Strategies

Changes in Flink SQL jobs often result in state incompatibility, leading to issues such as data flow disruptions, data loss, or inconsistency. The impact depends on downstream requirements for data accuracy and consistency. Below are two common scenarios and their corresponding modification strategies:

Assume the following declaration of a MATERIALIZED TABLE:

CREATE MATERIALIZED TABLE my_materialized_table 
FRESHNESS INTERVAL '1' MINUTE
AS 
SELECT col1, col2 
FROM paimon_source_table;

Scenario 1: Retaining Historical Data When Modifying Business Logic

This scenario typically applies when downstream systems maintain primary key tables or can tolerate slight data duplication. It also suits scenarios without long-period computations (e.g., extended joins or aggregations). In this case, the task needs to backtrack only a small portion of data to ensure accuracy.


Without MATERIALIZED TABLE:

1. Stop the old job.

2. Adjust the schema of the data table.

3. Modify the business logic, specifying a source checkpoint based on the previous timestamp.

4. Start a new job to backfill data for a short period, ensuring data integrity.


With MATERIALIZED TABLE:

Modify the query logic of the Materialized Table using the ALTER statement. This automatically adjusts the table schema and creates a new refresh task:

ALTER MATERIALIZED TABLE my_materialized_table 
AS 
SELECT col1, col2, col3
FROM paimon_source_table /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;


Scenario 2: No Need for Historical Data; Accuracy of Updated Data Only

This scenario applies to cases where significant business adjustments are made, and preserving historical data is unnecessary. The goal is a quick rebuild and seamless transition to new logic.

Without MATERIALIZED TABLE:

1. Stop the old job.

2. Rebuild the data table.

3. Modify the business logic and submit a new job.


With MATERIALIZED TABLE:

Replace the original table and query logic using the CREATE OR REPLACE statement:

CREATE OR REPLACE MATERIALIZED TABLE my_materialized_table
AS
SELECT col1, col2, col3
FROM paimon_source_table /*+ OPTIONS('scan.mode' = 'latest') */;


Public Interfaces

1. Modifying Query Logic: ALTER MATERIALIZED TABLE AS <query>

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <query>


Use Case: Modifying query logic while retaining historical data.

Behavior:

1. Stop background refresh tasks.

2. Adjust the schema of the underlying data table.

3. Update refresh tasks with the new query.

4. Resume background refresh tasks.


Schema Change Restrictions:

To avoid affecting downstream operations, schema changes are restricted as follows:

Allowed: Adding nullable columns to ensure compatibility with historical data and they must be added to the end of the schema.

Disallowed: Deleting or modifying existing columns, rearranging column order, or renaming columns.


Refresh Task Behavior:

FULL Mode: The task refreshes data using the new query logic

Non-partitioned tables: All data is refreshed during the next run.

Partitioned tables:

  • Refresh only the latest partitions if partition hints are effective.
  • Refresh all data if partition hints are absent or invalid.

Additionally, clearer hints will be provided in DESC MATERIALIZED TABLE to help users understand the behavior of partitioned tables.

Note:

partition hint will be effective in Full Mode, and it is used to generate the partition value corresponding to the latest time during each refresh. Only the latest partition will be refreshed.

For example, if we set the partition field ‘date’ with the parameter ‘yyyy-MM-dd’, then on 2024-12-18, only the data in the partition where date = '2024-12-18' will be refreshed.

If this parameter is set incorrectly, theoretically, the refresh task will be unable to generate valid partition values, which will result in the MATERIALIZED table having no data.


CONTINUOUS Mode: Stops the old job and starts a new one with the updated query.

Default Behavior:

The old job is stopped using a savepoint. Although this savepoint isn’t needed for the new query, it allows rollback if the modification fails. The new job doesn’t use the old job’s state, and the start offset is typically set by the connector or manually through hints.

Examples:

Without hint parameters: The default behavior for the Paimon source[5] is to read all data first, then read incrementally.

ALTER MATERIALIZED TABLE my_materialized_table 
AS 
SELECT col1, col2, col3
FROM paimon_source_table;

With hint parameters: Consumption starts from the latest snapshot as specified by the hint

ALTER MATERIALIZED TABLE my_materialized_table 
AS 
SELECT col1, col2, col3
FROM paimon_source_table /*+ OPTIONS('scan.mode'='latest') */;


2. Replacing a Table: CREATE OR REPLACE MATERIALIZED TABLE


CREATE [OR REPLACE] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
[ ([ <table_constraint> ]) ]
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[WITH (key1=val1, key2=val2, ...)]
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>


Use Case: Rebuilding the table without retaining historical data.

Behavior:

1. Stop background refresh tasks.

2. Delete the underlying data table.

3. Create a new table schema.

4. Update background refresh tasks with the new query.

5. Resume background refresh tasks.

Limitations: Due to operational complexity, atomicity is not supported for this operation.

Proposed Changes

Support Alter Materialized Table As Query & Replace Materialized Table

The specific behavior follows the description of the interface. In terms of implementation, we will support the parsing of corresponding SQL statements at the parsing layer, the conversion of SqlNode to operations, and the execution of specific operations.

Support Desc Materialized Table


desc extended my_materialized_table

In the DESC results, we will display information related to the materialized table, including refresh mode, refresh handle, definition query, and freshness .


#  Materialized Table 


definition query
refresh modeFULL(full table refresh/partition refresh)/CONTINUOUS
refresh handle{"jobid": ""}


Implementation Plan

Support for Modifying MATERIALIZED TABLE: Planned for version Flink 2.0 to provide basic modification capabilities.

Support for CREATE OR REPLACE: Targeted for versions after Flink 2.0.

Compatibility, Deprecation, and Migration Plan

This FLIP introduces only incremental changes and is fully backward compatible.

Test Plan

Unit tests, integration tests, and manual tests will be implemented to validate the changes.

Rejected Alternatives

1. Complex Schema Modifications for MATERIALIZED TABLE:

Operations such as renaming, column deletion, or reordering were deemed too risky as they may cause downstream incompatibility. Adding columns is more common and does not usually affect downstream tasks.


2. Retaining Certain Attributes in CREATE OR REPLACE:

Snowflake’s Replace Dynamic Table operation [4] retains attributes like share and authorization, but other table properties are not preserved. Similarly, this operation involves a full table replacement without retaining original table attributes.


3. Table Swapping:

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name 
SWAP WITH [catalog_name.][db_name.]table_name


Enabling table swapping for Materialized Tables addresses more complex modification requirements. However, due to the current inability to ensure atomicity in swap operations and the limited applicability of this feature, a separate FLIP may be needed to define the behavior and safeguards for the SWAP operation explicitly, including the following aspects:

Atomicity Guarantee:

Ensuring consistency of table states during the swapping process to avoid intermediate states that could impact business logic.

Smooth Transition for Refresh Tasks:

During metadata exchange, refresh tasks may fail to transition smoothly, leading to swap failures.

Downstream Dependency Adjustments:

After completing the table swap, additional configurations or adjustments may be required for downstream systems to adapt to changes in the table’s state or structure.


Reference

  1. https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines

  2. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/statements/#create-materialized-table

  3. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-or-replace-table

  4. https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table

  5. https://paimon.apache.org/docs/master/append-table/streaming/#streaming-query


Appendix

  1. https://docs.google.com/document/d/1e-CUddctqj8oqvPDK6BnhWMCFCvEahsvPaR1VQvrhZU/edit?tab=t.0