Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the current data warehouse processing architecture, both the Lambda architecture with separate streaming and batch processing systems and the Lakehouse architecture with a unified streaming-batch storage-based processing model exist. However, there are some issues with both architectures:

...

Hence, starting from the concept of unified streaming and batch processing (One SQL & Unified Table),  we propose a new table type based on the Dynamic Tables concept[1], called Materialized Table, to simplify streaming and batch ETL pipeline cost-effectively. This allows users to no longer be confused by various complex concepts such as streaming and batch processing, as well as underlying technical details, and to focus solely on business logic and data freshness.

User Story

Let's walk through a user story to see how Materialized Table solves the problem described earlier.

Business Background

Suppose the MySQL instance has three business tables orders (orders table), orders_pay (orders payment table), and products (product category dim table), which are first ingested in real time to a lake storage such as Paimon via Flink CDC. We are going to perform data widening operations on these three tables, let's see users how to create a data processing pipeline, and pause and resume the data pipeline through Dynamic Table.

Create Data Pipeline

 orders and order_payments are two base tables at ODS, partitioned by ds, here we create a DWD wide table dwd_orders.

...

When a Create Materialized Table statement is executed, it deduces the schema of the table based on the query statement referenced, first creates a Materialized Table in Catalog, and then automatically creates a data processing pipeline to refresh the table based on the specified Freshness value. Since FRESHNESS is set to 3 minutes here, a Flink Streaming Job is pulled up to refresh continuously.

Pause & Resume Data Pipeline

Code Block
languagesql
-- 1. Pause table data refresh
ALTER MATERIALIZED TABLE dwd_orders SUSPEND;

-- 2. Resume table data refresh
ALTER MATERIALIZED TABLE dwd_orders RESUME
-- Set table option via WITH clause
WITH(
  'sink.parallesim' = '10'
);

...

  1. Through the SUSPEND statement, the refresh job can be paused.
  2. Through the RESUME statement, the refresh job can be resumed. 

Manual Refresh

Code Block
languagesql
-- Refresh historical partition
ALTER MATERIALIZED TABLE dwd_orders REFRESH PARTITION(ds='20231023')

...

Code Block
languagesql
UPDATE dwd_orders SET order_number = 10 WHERE id = 73635185;

Modify Data Freshness

Code Block
languagesql
ALTER MATERIALIZED TABLE dwd_orders SET FRESHNESS = INTERVAL '1' DAY

...

In summary, we hope that through the Materialized Table with declared SQL statement and freshness,  you can define the batch and streaming transformation on your data in the same way,  accelerate ETL pipeline development, automatically manage task orchestration, enhance data quality and freshness, save your time and effort.

Downstream Pipeline

Cascading Dynamic Table

Code Block
languagesql
CREATE MATERIALIZED TABLE dws_deals(
	PRIMARY KEY (order_id, ds) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
  o.ds
  o.order_id,
  count(*) order_cnt,
  sum(pay_amt) order_total_amt
  ...
FROM 
  dwd_orders as o
group by o.order_id, o.ds

We can create cascading DWS Materialized Table dws_deals based on DWD's Materialized Table dwd_orders.

OLAP Query

Code Block
languagesql
SELECT * FROM dwd_orders WHERE ds = '20231023' LIMIT 10;

We can also select the Materialized Table from dwd_orders by a point query.

Public Interfaces

SQL Statements

To support materialized table, we need to introduce some new syntax.

Create

Code Block
languagesql
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name

[ ([ <table_constraint> ]) ]

[COMMENT table_comment]

[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]

[WITH (key1=val1, key2=val2, ...)]

FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }

[REFRESH_MODE = { CONTINUOUS | FULL }]

AS <select_statement>

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

...

  • Does not support explicitly specifying columns.
  • Does not support modifying the select statement after the materialized table is created. 
  • Does not support referencing temporary tables, views, or functions in Select statements currently.

FRESHNESS Clause

For a materialized table, a very important attribute is freshness, so we must highlight its function first. Freshness serves two key functions. 

...

  • In continuous mode, it will be used to derive checkpoint & minibatch related options to determine the visibility of data to satisfy the defined freshness. 
  • In full mode, it will be used to derive the scheduling interval of the workflow, i.e., the Cron expression, and calculate the corresponding time partition value (e.g., ds) for partitioned table, to satisfy the defined data freshness of materialized tabe.

REFRESH_MODE Clause

In the previous section we explained that Refresh Mode is automatically derived by the engine based on freshness, so why do we provide the REFRESH_MODE clause to support specifying Refresh Mode manually? What is the relationship between these two? Considering different business scenarios, in some scenarios the user knows exactly what refresh mode is needed, and the refresh mode derived from freshness can't meet the demand, so we provide a more flexible way to allow manual specification of refresh mode.

If the REFRESH_MODE is specified at the time of table creation, its priority is higher than freshness, which means freshness will no longer be used to infer the refresh mode of the dynamic table.

Full Refresh Mode

In full refresh mode, for materialized table, the default overwrite behavior is table granularity. If a materialized table is a partitioned table, and the time partition field format is specified by 'partition.fields.#.date-formatter' when creating the table, the default overwrite behavior is partition granularity, that is, only the latest partition data is overwritten each time. For more details on 'partition.fields.#.date-formatter', see the partitioned table full refresh section.

...

Operations on materialized tables are executed through the following Alter statements.

Suspend

Code Block
languagesql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND

Stop or suspend refreshes on the materialized table, also means that the background refresh job is suspended. If the materialized table is used by other downstream dynamic tables, the downstream will not be suspended, effective only for the current materialized table.

Resume

Code Block
languagesql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME

[WITH('key1' = 'val1', 'key2' = 'val2')]

...

You can also specify one or more materialized table options by WITH keyword to optimize the background refresh job, these options are only effective for the current operation and will not be persisted. 

Refresh

Code Block
languagesql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH

[PARTITION (key1=val1, key2=val2, ...)]

...

Manual Refresh command and the background refresh job can run in parallel, the framework does not impose any limitations on this. Of course, if the refresh mode is continuous and the background job is running, you should be careful with the refresh command, which can lead to inconsistent data.

Set

Freshness

Code Block
languagesql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name 

  SET FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }

...

Note: If the refresh mode is manually specified when creating a materialized table, modify freshness will not modify the refresh mode.

REFRESH_MODE

Code Block
languagesql
ALTER MATERIALIZED TABLE  [catalog_name.][db_name.]table_name 

  SET REFRESH_MODE = { FULL | CONTINUOUS }

Modify the refresh mode of the materialized table, which may change from continuous mode to full mode, or full mode to continuous mode.

Options

Code Block
languagesql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name  SET ('key' = 'val');

Reset

Code Block
languagesql
ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESET ('key');

Besides the aforementioned Alter syntax, materialized tables currently do not support other Alter syntaxes that are applicable to regular Table.

Drop

Code Block
languagesql
DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name

Drop materialized table will delete the refresh job first, then drop the metadata and data.

Describe Job

Code Block
languagesql
{ DESCRIBE | DESC } JOB ${jobId} 

Describe the specified job information in the Flink cluster, the return information schema same as SHOW JOBS statement.

Interface Changes

CatalogDynamicTable

We introduce a new interface CatalogMaterializedTable which extends the CatalogBaseTable. As described above, the CatalogMaterializedTable, in addition to having the attributes of schema, options and partition keys , also encompasses other important metadata: definition query, freshness, refresh mode, refresh status and refresh handler.

Code Block
languagejava
/**
 * Represents the unresolved metadata of a materialized table in a {@link Catalog}.
 *
 * <p>Materialized Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogMaterializedTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background refresh pipeline, either through a flink streaming or scheduled batch job, it is
 *       initialized after materialized table is created.
 * </ul>
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, LogicalRefreshMode, RefreshMode, RefreshStatus, String, byte[])} for a basic
 * implementation of this interface or create a custom class that allows passing catalog-specific
 * objects all the way down to the connector creation (if necessary).
*/
@PublicEvolving
public interface CatalogMaterializedTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.MATERIALIZED_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogMaterializedTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogMaterializedTable copy(Map<String, String> options);

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given refresh info.
     *
     * @return a new copy of this table with replaced refresh info
     */
    CatalogDynamicTable copy(
            RefreshStatus refreshStatus,
            String refreshHandlerDescription,
            byte[] serializedRefreshHandler);
      
    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    Optional<Long> getSnapshot();

    /**
     * The definition query text of materialized table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the materialized table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of materialized table which is used to determine the physical refresh mode. */
    Duration getFreshness();

    /** Get the logical refresh mode of materialized table. */
    LogicalRefreshMode getLogicalRefreshMode();

    /** Get the physical refresh mode of materialized table. */
    RefreshMode getRefreshMode();

    /** Get the refresh status of materialized table. */
    RefreshStatus getRefreshStatus();

    /** Return summary description of refresh handler. */
    Optional<String> getRefreshHandlerDescription();

    /**
     * Return the serialized refresh handler of materialized table. This will not be used for describe
     * table.
     */
    byte[] getSerializedRefreshHandler();

    /** The logical refresh mode of materialized table. */
    @PublicEvolving
    enum LogicalRefreshMode {
        /**
         * The refresh pipeline will be executed in continuous mode, corresponding to {@link
         * RefreshMode#CONTINUOUS}.
         */
        CONTINUOUS,

        /**
         * The refresh pipeline will be executed in full mode, corresponding to {@link
         * RefreshMode#FULL}.
         */
        FULL,

        /**
         * The refresh pipeline mode is determined by freshness of materialized table, either {@link
         * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}.
         */
        AUTOMATIC
    }

    /** The physical refresh mode of materialized table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }

    /** Background refresh pipeline status of materialized table. */
    @PublicEvolving
    enum RefreshStatus {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}


Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create materialized table represents a  physical storage,  which can provide full historical data or incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

Code Block
languagejava
titleCatalog
@PublicEvolving
public interface Catalog {
    
    /**
     * Returns a factory for creating instances from catalog objects.
     *
     * <p>This method enables bypassing the discovery process. Implementers can directly pass
     * internal catalog-specific objects to their own factory. For example, a custom {@link
     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
     *
     * <p>If this catalog support to create materialized table, you should also override this method to
     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and
     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override
     * this method, you must specify the physical connector identifier that this catalog represents
     * storage when create materialized table. Otherwise, the planner can't find the {@link
     * DynamicTableFactory}.
     *
     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement
     * multiple supported extension points. An {@code instanceof} check is performed by the caller
     * that checks whether a required factory is implemented; otherwise the discovery process is
     * used.
     */
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }
}


RefreshHandler

We introduce the class RefreshHandler that records the meta information of the current materialized table background refresh job.

Code Block
languagejava
/**
 * This interface represents the meta information of current dynamic table background refresh
 * pipeline. The refresh mode maybe continuous or full. The format of the meta information in the
 * two modes is not consistent, so user need to implementation this interface according to .
 *
 * <p>This meta information will be serialized to bytes by {@link RefreshHandlerSerializer}, then
 * store to Catalog for suspend or drop {@link CatalogDynamicTable}.
 *
 * <p>In continuous mode, the format of the meta information maybe { "clusterType": "yarn",
 * "clusterId": "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information format maybe { "endpoint": "xxx", "workflowId": "yyy" }.
 * Due to you may use different workflow scheduler plugin in this mode, you should implement this
 * interface according to your plugin.
 */
@PublicEvolving
public interface RefreshHandler {

    /** Returns a string that summarizes this refresh handler meta information for printing to a console or log. */
    String asSummaryString();
}


ResolvedCatalogMaterializedTable

We introduce the public class ResolvedCatalogMaterializedTable that represents a validated CatalogMaterializedTable.

Code Block
languagejava
/**
 * A validated {@link CatalogMaterializedTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogMaterializedTable
        implements ResolvedCatalogBaseTable<CatalogMaterializedTable>, CatalogMaterializedTable {

    private final CatalogMaterializedTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogMaterializedTable(CatalogMaterializedTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog materialized table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptions() {
        return origin.getOptions();
    }

    @Override
    public String getComment() {
        return origin.getComment();
    }

    @Override
    public CatalogBaseTable copy() {
        return new ResolvedCatalogMaterializedTable((CatalogMaterializedTable) origin.copy(), resolvedSchema);
    }

    @Override
    public CatalogMaterializedTable copy(
            RefreshStatus refreshStatus,
            String refreshHandlerDescription,
            byte[] serializedRefreshHandler) {
        return new ResolvedCatalogMaterializedTable(
                origin.copy(refreshStatus, refreshHandlerDescription, serializedRefreshHandler),
                resolvedSchema);
    }

    @Override
    public Optional<String> getDescription() {
        return origin.getDescription();
    }

    @Override
    public Optional<String> getDetailedDescription() {
        return origin.getDetailedDescription();
    }

    @Override
    public boolean isPartitioned() {
        return origin.isPartitioned();
    }

    @Override
    public List<String> getPartitionKeys() {
        return origin.getPartitionKeys();
    }

    @Override
    public ResolvedCatalogMaterializedTable copy(Map<String, String> options) {
        return new ResolvedCatalogMaterializedTable(origin.copy(options), resolvedSchema);
    }

    @Override
    public Optional<Long> getSnapshot() {
        return Optional.empty();
    }

    @Override
    public CatalogMaterializedTable getOrigin() {
        return origin;
    }

    @Override
    public ResolvedSchema getResolvedSchema() {
        return resolvedSchema;
    }

    @Override
    public String getDefinitionQuery() {
        return origin.getDefinitionQuery();
    }

    @Override
    public Duration getFreshness() {
        return origin.getFreshness();
    }

    @Override
    public LogicalRefreshMode getLogicalRefreshMode() {
        return origin.getLogicalRefreshMode();
    }

    @Override
    public RefreshMode getRefreshMode() {
        return origin.getRefreshMode();
    }

    @Override
    public RefreshStatus getRefreshStatus() {
        return origin.getRefreshStatus();
    }

    @Override
    public Optional<String> getRefreshHandlerDescription() {
        return origin.getRefreshHandlerDescription();
    }

    @Nullable
    @Override
    public byte[] getSerializedRefreshHandler() {
        return origin.getSerializedRefreshHandler();
    }
}


Configuration

We need to introduce two options to control the background refresh job.

...

Modifying this option may affect the refresh mode of the background refresh jobs of all registered materialized tables in the Catalog, so please modify it carefully. If you really need to modify the job refresh mode, please prioritize to realize it by modifying materialized table freshness.


Gateway Rest API

v3/materialized-tables/refresh

Verb: POST

Response code: 200 OK

Trigger a refresh operation of materialized tables.

Request

body

[

  "materializedTable": "",

  "isPeriodic": true

  "scheduleTime": "",

  "scheduleTimeFormat": "",

  "config":  {

            "key":  "value"

    }

]

Response

body

{

  jobID: "",

  clusterInfo: {}

}

This API is used for workflow scheduler to trigger a refresh operation of materialized table aumatically. This is just a draft API, the main purpose of which is to give you an idea of the mechanics of the refresh operation. We will give a detailed design in another FLIP that supports pluggable Workflow Scheduler.

Proposed Design

Integrating the aforementioned context, Materialized Table is a new table type introduced for Flink SQL that enables users to build and manage data pipelines, it is automatically refreshed at specified freshness intervals. This means that the materialized table needs to be bound to a transformation SQL statement, and based on the defined data freshness requirements, a background data refresh piepline will be created to automatically update the data.

...

  1. Metadata: This includes the table's schema, options, definition query, freshness and refresh handler which points to refresh job.
  2. Data: This corresponds to a specific storage that holds the current table's data, which can provide full historical data or incremental changelog. 
  3. Background Refresh Pipeline: This pipeline ensures the production and updating of data in either a continuous or full refresh mode.

Overall Architecture

Taking into account the functional design of Materialized Table, the management of its entire lifecycle is primarily supported by the following core components in the architecture:

...

Regarding the details of interfacing with various open source schedulers, we will discuss the relevant interfaces involved in a separate FLIP. For specific schedulers such as airflow and dolphinscheduler will be implemented and maintained in a separate repository (similar to connector repos).

Partitioned Table Full Refresh

Based on the previous introduction, we understand that in full refresh mode, batch scheduling tasks are created based on freshness, and each execution trigger results in a full table refresh. A common scenario in data warehouses is time-partitioned tables, such as daily partitions keyed by 'ds'. If the user expects only to refresh the latest partition with each execution, how can this be handled? We will introduce the implementation details through the following example.

...

If FRESHNESS = INTERVAL '1' HOUR, what would the behavior be? First, a task that schedules on an hourly basis is registered on the Scheduler. Then, with each execution, the framework will use the time provided by the scheduling system, combined with freshness and the 'partition.fields.ds.date-formatter', to calculate the 'ds' partition that needs to be refreshed. For this scenario, it would compute that ds='2024-03-02', which is the latest time partition.

Compatibility, Deprecation, and Migration Plan

This FLIP only has incremental changes, and thus is fully backwards compatible.

Test Plan

Both Unit Test & Integration Test & Manual Test will be introduced to verify this change. 

Rejected Alternatives

Materialized View

Why not use Materialized View? A view is an equivalent of a query result. According to the definition of the SQL standard, it is not supposed to support modifications. However, in the lakehouse scenario, there is a need to modify data, which leads to a conceptual conflict between the two. Therefore, using a table is more appropriate.

Base class of CatalogMaterializedTable 

Regarding CatalogMaterializedTable, another alternative is extending CatalogTable directly instead of CatalogBaseTable. This makes it straightforward to have attributes of CatalogTable.

...

  1. Materialized Table is a new table type that also includes attributes such as query, freshness and refresh job, which makes it more appropriate to use the new interface.
  2. If CatalogTable adds a new attribute that is not supported by Materialized Table, it will lead to a conflict, so it is more appropriate to represent it using an interface that is parallel to CatalogTable.

Future Improvements

Temporary View Limitation

Since both the materialized table and the query it references need to be persistent, and the temporary view is executed locally and only effective for the current execution, it cannot be persisted. Therefore, it is not supported. If there is a real need to use temporary view, there are currently two solutions. One is to use a View, and the other is to use a Common Table Expression (CTE)[7], which is equivalent in effect to a temporary view. Of course, future support for temporary view can be determined based on user demand and in conjunction with technical implementation.

Code Block
languagesql
CREATE MATERIALIZED TABLE sink
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '3' MINUTE
AS
-- use cte
WITH cte_source AS (
  SELECT 
  user_id,
  shop_id,
  FROM_UNIXTIME(ds, 'yyyy-MM-dd') AS ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
FROM c_source
GROUP BY user_id, shop_id, FROM_UNIXTIME(ds, 'yyyy-MM-dd')
)
SELECT * FROM cte_source

Modify Select Statement Limitation

From the standpoint of strict semantics, the current design of the Materialized Table does not support altering the query statement. In the context of a lakehouse, a common scenario involves the need to add fields due to schema evolution, which is a legitimate requirement that we intend to support in the future. One possible solution we have conceived is to use the syntax Alter Materialized Table xxx ADD query_statement to provide this support. At the framework level, we would optimize by merging multiple query statements that write to the same Materialized Table.

Code Block
languagesql
-- create a materialized table with two agg indices
CREATE MATERIALIZED TABLE dws_deals(
	PRIMARY KEY (order_id, ds) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
  o.ds
  o.order_id,
  count(*) order_cnt,
  sum(pay_amt) order_total_amt
FROM 
  dwd_orders as o
GROUP BY o.order_id, o.ds;

-- add two new agg indices statement
ALTER MATERIALIZED TABLE dws_deals
ADD 
SELECT
  o.ds
  o.order_id,
  count(distinct order_id) dedup_order_cnt, -- deduplicated order cnt
  sum(goods_cnt) tatol_goods_cnt -- total goods cnt
FROM 
  dwd_orders as o
GROUP BY o.order_id, o.ds;


Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables

...