Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the current data warehouse processing architecture, both the Lambda architecture with separate streaming and batch processing systems and the Lakehouse architecture with a unified streaming-batch storage-based processing model exist. However, there are some issues with both architectures:

  • Currently, Flink SQL provides two execution modes: streaming and batch. Users must choose the appropriate execution mode based on data freshness requirements. In streaming execution mode, the INSERT INTO statement is used, and in batch execution mode, the INSERT OVERWRITE statement is utilized. There's also heterogeneity in partition conditions for partitioned tables, leading to the maintenance of two sets of code.
  • When integrating with modern streaming-batch unified storage and using incremental computation model, if there's a need to bulk refresh certain historical partitions, streaming job code logic cannot be directly reused. It necessitates the redevelopment of batch job code with specified partition conditions to avoid a full table refresh, which can result in data duplication.
  • Manual creation and deployment of jobs are required. For streaming jobs, users need to fine-tune parameters such as checkpoint intervals, minibatch, etc., which can be costly in terms of optimization effort. For batch jobs, workflow schedulers like Airflow are needed to create jobs. These are two disjointed systems that cannot seamlessly switch when modifying data freshness requirements.

In response to the aforementioned issues, Flink SQL has long introduced the concept of Dynamic Tables[1] in an attempt to solve these issues, but they have not yet been resolved. At the same time, the industry is also trying to address these challenges. The cloud data warehouse giant Snowflake has unveiled its groundbreaking feature-Dynamic Tables[2] to simplify the creation and management of data pipelines, unify batch and streaming, looking forward to realizing lower data latency, more real-time capabilities. Databricks also has launched a declarative ETL framework Delta Live Tables[3], with a unified API, unifying the streaming and batch data processing pipeline, accelerating ETL development. This also indicates that simplifying data processing pipelines and unifying streaming and batch are current technological trends in the data warehouse field.

Hence, starting from the concept of unified streaming and batch processing (One SQL & Unified Table),  we propose a new table type based on the Dynamic Tables concept[1], called Materialized Table, to simplify streaming and batch ETL pipeline cost-effectively. This allows users to no longer be confused by various complex concepts such as streaming and batch processing, as well as underlying technical details, and to focus solely on business logic and data freshness.

User Story

Let's walk through a user story to see how Materialized Table solves the problem described earlier.

Business Background

Suppose the MySQL instance has three business tables orders (orders table), orders_pay (orders payment table), and products (product category dim table), which are first ingested in real time to a lake storage such as Paimon via Flink CDC. We are going to perform data widening operations on these three tables, let's see users how to create a data processing pipeline, and pause and resume the data pipeline through Dynamic Table.

Create Data Pipeline

 orders and order_payments are two base tables at ODS, partitioned by ds, here we create a DWD wide table dwd_orders.

CREATE MATERIALIZED TABLE dwd_orders
(
   PRIMARY KEY(ds, id) NOT ENFORCED
) 
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '3' MINUTE
AS SELECT 
  o.ds
  o.id,
  o.order_number,
  o.user_id,
...
FROM 
  orders as o
  LEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prod
	  ON o.product_id = prod.id
  LEFT JOIN order_pay AS pay
  	ON o.id = pay.order_id and o.ds = pay.ds

When a Create Materialized Table statement is executed, it derives the schema of the table based on the query statement referenced, first creates a Materialized Table in Catalog, and then automatically creates a data processing pipeline to refresh the table based on the specified Freshness value. Since FRESHNESS is set to 3 minutes here, a Flink Streaming Job is launched to refresh continuously.

Pause & Resume Data Pipeline

-- 1. Pause table data refresh
ALTER MATERIALIZED TABLE dwd_orders SUSPEND;

-- 2. Resume table data refresh
ALTER MATERIALIZED TABLE dwd_orders RESUME
-- Set table option via WITH clause
WITH(
  'sink.parallesim' = '10'
);


  1. Through the SUSPEND statement, the refresh job can be paused.
  2. Through the RESUME statement, the refresh job can be resumed. 

Manual Refresh

-- Refresh historical partition
ALTER MATERIALIZED TABLE dwd_orders REFRESH PARTITION(ds='20231023')


If you need to backfill some historical partition for a partitioned table, you just need to execute a REFRESH statement and specify the partition condition, it will launch a Flink batch job to refresh the table.

Typically, in order to comply with GDPR requirements, it may be necessary to modify certain data within a table, and this can be accomplished using the Update statement.

UPDATE dwd_orders SET order_number = 10 WHERE id = 73635185;

Modify Data Freshness

ALTER MATERIALIZED TABLE dwd_orders SET FRESHNESS = INTERVAL '1' DAY

If you want to adjust the data freshness, you just need to modify this materialized table freshness attribute. It will stop the Flink streaming job first, then create a scheduled task automatically in the workflow scheduler. The workflow will trigger the table refresh by the specified freshness.

From the above operational process, it can be seen that the use of Materialized Table can greatly simplify the development of the data processing pipeline. By using a unified declarative SQL API to unify the stream and batch pipelines, users do not need to be concerned with Flink job and execution mode, they simply need to operate with the materialized table.

In summary, we hope that through the Materialized Table with declared SQL statement and freshness,  you can define the batch and streaming transformation on your data in the same way,  accelerate ETL pipeline development, automatically manage task orchestration, enhance data quality and freshness, save your time and effort.

Downstream Pipeline

Cascading Dynamic Table

CREATE MATERIALIZED TABLE dws_deals(
	PRIMARY KEY (order_id, ds) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
  o.ds
  o.order_id,
  count(*) order_cnt,
  sum(pay_amt) order_total_amt
  ...
FROM 
  dwd_orders as o
group by o.order_id, o.ds

We can create cascading DWS Materialized Table dws_deals based on DWD's Materialized Table dwd_orders.

OLAP Query

SELECT * FROM dwd_orders WHERE ds = '20231023' LIMIT 10;

We can also select the Materialized Table from dwd_orders by a point query.

Public Interfaces

SQL Statements

To support materialized table, we need to introduce some new syntax.

Create

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name

[ ([ <table_constraint> ]) ]

[COMMENT table_comment]

[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]

[WITH (key1=val1, key2=val2, ...)]

FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }

[REFRESH_MODE = { CONTINUOUS | FULL }]

AS <select_statement>

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

Parameters

  • PRIMARY KEY: An optional list of columns uniquely identifies each row within the table. The column as primary key must be non-null.
  • PARTITIONED BY: An optional list of columns of the table to partition the table by. 
  • WITH: Optionally sets one or more table properties.
  • FRESHNESS: Define the time measure that the materialized table’s content should lag behind updates to the base tables. This is merely a best-effort behavior, not a guarantee. 
  • REFRESH_MODE: An Optional clause specifies the refresh type for the materialized table. This property can not be altered after the materialized table is created.
  • AS select_statement: This clause populates the materialized table using the data from the select query. The base tables can be a materialized table, table or view.

Limitations:

  • Does not support explicitly specifying columns.
  • Does not support modifying the select statement after the materialized table is created. 
  • Does not support referencing temporary tables, views, or functions in Select statements currently.

FRESHNESS Clause

For a materialized table, a very important attribute is freshness, so we must highlight its function first. Freshness serves two key functions. 

Firstly, freshness is used for determining the data refresh mode of a materialized table which can satisfy the specified data freshness, there are two types of refresh modes currently:

  • Continuous Refresh Mode: Launches a Flink streaming job that continuously refreshes the materialized table data.
  • Full Refresh Mode: A Workflow Scheduler periodically triggers a Flink batch job to refresh the materialized table data.

Regarding how to determine the refresh mode, you can see the Configuration section for more detail.

Secondly, after determining the refresh mode, it will be further used to determine the parameters of the refresh job:

  • In continuous mode, it will be used to derive checkpoint & minibatch related options to determine the visibility of data to satisfy the defined freshness. 
  • In full mode, it will be used to derive the scheduling interval of the workflow, i.e., the Cron expression, and calculate the corresponding time partition value (e.g., ds) for partitioned table, to satisfy the defined data freshness of materialized tabe.

REFRESH_MODE Clause

In the previous section we explained that Refresh Mode is automatically derived by the engine based on freshness, so why do we provide the REFRESH_MODE clause to support specifying Refresh Mode manually? What is the relationship between these two? Considering different business scenarios, in some scenarios the user knows exactly what refresh mode is needed, and the refresh mode derived from freshness can't meet the demand, so we provide a more flexible way to allow manual specification of refresh mode.

If the REFRESH_MODE is specified at the time of table creation, its priority is higher than freshness, which means freshness will no longer be used to infer the refresh mode of the materialized table.

Full Refresh Mode

In full refresh mode, for materialized tables, the default overwrite behavior is table granularity. If a materialized table is a partitioned table, and the time partition field format is specified by 'partition.fields.#.date-formatter' when creating the table, the default overwrite behavior is partition granularity, that is, only the latest partition data is overwritten each time. For more details on 'partition.fields.#.date-formatter', see the partitioned table full refresh section.


Operations on materialized tables are executed through the following Alter statements.

Suspend

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND

Stop or suspend refreshes on the materialized table, also means that the background refresh job is suspended. If the materialized table is used by other downstream dynamic tables, the downstream will not be suspended, effective only for the current materialized table.

Resume

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME

[WITH('key1' = 'val1', 'key2' = 'val2')]


Restart or resume refresh on the materialized table, also means that the background refresh job is resumed. Resume operations only work for current materialized table, and don't cascade resumes downstream materialized tables are suspended.

You can also specify one or more materialized table options by WITH keyword to optimize the background refresh job, these options are only effective for the current operation and will not be persisted. 

Refresh

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH

[PARTITION (key1=val1, key2=val2, ...)]

Manually trigger a materialized table refresh, only works for the current table, doesn't trigger a cascade refresh of all downstream tables default.

Manual Refresh command and the background refresh job can run concurrent, the framework does not impose any limitations on this. Of course, if the refresh mode is continuous and the background job is running, you should be careful with the refresh command, which can lead to inconsistent data.

Set

Freshness

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name 

  SET FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }

Modify the time measure that the materialized table’s content should lag behind updates to the base tables. Modifying freshness may affect the refresh mode of the background job, which may change from continuous mode to full mode, from full mode to continuous mode, or remain unchanged. In any case, altering freshness will stop the current refresh job and create a new job with the updated freshness.

Note: If the refresh mode is specified by REFRESH_MODE clause when creating a materialized table, modifying freshness will not affect the refresh mode.

REFRESH_MODE

ALTER MATERIALIZED TABLE  [catalog_name.][db_name.]table_name 

  SET REFRESH_MODE = { FULL | CONTINUOUS }

Modify the refresh mode of the materialized table, which may change from continuous mode to full mode, or full mode to continuous mode.

Options

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name  SET ('key' = 'val');

Reset

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESET ('key');

Besides the aforementioned Alter syntax, materialized tables currently do not support other Alter syntaxes that apply to regular Table.

Drop

DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name

Drop materialized table will delete the refresh job first, then drop the metadata and data.

Describe Job

{ DESCRIBE | DESC } JOB ${jobId} 

Describe the specified job information in the Flink cluster, the return information schema same as SHOW JOBS statement.

Interface Changes

CatalogMaterializedTable

We introduce a new interface CatalogMaterializedTable which extends the CatalogBaseTable. As described above, the CatalogMaterializedTable, in addition to having the attributes of schema, options, and partition keys, also encompasses other important metadata: definition query, freshness, refresh mode, refresh status, and refresh handler.

/**
 * Represents the unresolved metadata of a materialized table in a {@link Catalog}.
 *
 * <p>Materialized Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogMaterializedTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background refresh pipeline, either through a flink streaming or scheduled batch job, it is
 *       initialized after materialized table is created.
 * </ul>
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, LogicalRefreshMode, RefreshMode, RefreshStatus, String, byte[])} for a basic
 * implementation of this interface or create a custom class that allows passing catalog-specific
 * objects all the way down to the connector creation (if necessary).
*/
@PublicEvolving
public interface CatalogMaterializedTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.MATERIALIZED_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogMaterializedTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogMaterializedTable copy(Map<String, String> options);

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given refresh info.
     *
     * @return a new copy of this table with replaced refresh info
     */     
    CatalogMaterializedTable copy(
            RefreshStatus refreshStatus,
            String refreshHandlerDescription,
            byte[] serializedRefreshHandler);
      
    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    Optional<Long> getSnapshot();

    /**
     * The definition query text of materialized table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the materialized table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of materialized table which is used to determine the physical refresh mode. */
    Duration getFreshness();

    /** Get the logical refresh mode of materialized table. */
    LogicalRefreshMode getLogicalRefreshMode();

    /** Get the physical refresh mode of materialized table. */
    RefreshMode getRefreshMode();

    /** Get the refresh status of materialized table. */
    RefreshStatus getRefreshStatus();

    /** Return summary description of refresh handler. */
    Optional<String> getRefreshHandlerDescription();

    /**
     * Return the serialized refresh handler of materialized table. This will not be used for describe
     * table.
     */
    byte[] getSerializedRefreshHandler();

    /** The logical refresh mode of materialized table. */
    @PublicEvolving
    enum LogicalRefreshMode {
        /**
         * The refresh pipeline will be executed in continuous mode, corresponding to {@link
         * RefreshMode#CONTINUOUS}.
         */
        CONTINUOUS,

        /**
         * The refresh pipeline will be executed in full mode, corresponding to {@link
         * RefreshMode#FULL}.
         */
        FULL,

        /**
         * The refresh pipeline mode is determined by freshness of materialized table, either {@link
         * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}.
         */
        AUTOMATIC
    }

    /** The physical refresh mode of materialized table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }

    /** Background refresh pipeline status of materialized table. */
    @PublicEvolving
    enum RefreshStatus {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}


Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support creates a materialized table that represents a physical storage,  which can provide full historical data or incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for the planner to compile a query.

Catalog
@PublicEvolving
public interface Catalog {
    
    /**
     * Returns a factory for creating instances from catalog objects.
     *
     * <p>This method enables bypassing the discovery process. Implementers can directly pass
     * internal catalog-specific objects to their own factory. For example, a custom {@link
     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
     *
     * <p>If this catalog support to create materialized table, you should also override this method to
     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and
     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override
     * this method, you must specify the physical connector identifier that this catalog represents
     * storage when create materialized table. Otherwise, the planner can't find the {@link
     * DynamicTableFactory}.
     *
     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement
     * multiple supported extension points. An {@code instanceof} check is performed by the caller
     * that checks whether a required factory is implemented; otherwise the discovery process is
     * used.
     */
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }
}


RefreshHandler

We introduce the interface RefreshHandler which records the meta information of the current materialized table background refresh job.

/**
 * This interface represents the meta information of current materialized table background refresh
 * pipeline. The refresh mode maybe continuous or full, the meta information in the two modes is not
 * consistent, so user need to implementation this interface according to business scenario.
 *
 * <p>In continuous mode, the meta information maybe contains { "clusterType": "yarn", "clusterId":
 * "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information maybe contains { "endpoint": "xxx", "workflowId": "yyy" }.
 * Due to user may use different workflow scheduler in this mode, user should implement this
 * interface according to their plugin.
 *
 * <p>This interface will be serialized to bytes by {@link RefreshHandlerSerializer}, then store to
 * Catalog for further operation.
 */ 
@PublicEvolving
public interface RefreshHandler {

    /** Returns a string that summarizes this refresh handler meta information for printing to a console or log. */
    String asSummaryString();
}

RefreshHandlerSerializer

Introduce the interface RefreshHandlerSerializer which is used to serialize and deserialize RefreshHandler. The serialized bytes will be persisted to CatalogMaterializedTable for further operation.

/** This interface is used to serialize and deserialize the {@link RefreshHandler}. */
@PublicEvolving
public interface RefreshHandlerSerializer<T extends RefreshHandler> {

    /** Serialize the {@link RefreshHandler} to bytes. */
    byte[] serialize(T refreshHandler) throws IOException;

    /** Deserialize the bytes to a {@link RefreshHandler} instance. */
    T deserialize(byte[] serializedBytes, ClassLoader cl)
            throws IOException, ClassNotFoundException;
}

ResolvedCatalogMaterializedTable

We introduce the public class ResolvedCatalogMaterializedTable that represents a validated CatalogMaterializedTable.

/**
 * A validated {@link CatalogMaterializedTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogMaterializedTable
        implements ResolvedCatalogBaseTable<CatalogMaterializedTable>, CatalogMaterializedTable {

    private final CatalogMaterializedTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogMaterializedTable(CatalogMaterializedTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog materialized table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptions() {
        return origin.getOptions();
    }

    @Override
    public String getComment() {
        return origin.getComment();
    }

    @Override
    public CatalogBaseTable copy() {
        return new ResolvedCatalogMaterializedTable((CatalogMaterializedTable) origin.copy(), resolvedSchema);
    }

    @Override
    public CatalogMaterializedTable copy(
            RefreshStatus refreshStatus,
            String refreshHandlerDescription,
            byte[] serializedRefreshHandler) {
        return new ResolvedCatalogMaterializedTable(
                origin.copy(refreshStatus, refreshHandlerDescription, serializedRefreshHandler),
                resolvedSchema);
    }

    @Override
    public Optional<String> getDescription() {
        return origin.getDescription();
    }

    @Override
    public Optional<String> getDetailedDescription() {
        return origin.getDetailedDescription();
    }

    @Override
    public boolean isPartitioned() {
        return origin.isPartitioned();
    }

    @Override
    public List<String> getPartitionKeys() {
        return origin.getPartitionKeys();
    }

    @Override
    public ResolvedCatalogMaterializedTable copy(Map<String, String> options) {
        return new ResolvedCatalogMaterializedTable(origin.copy(options), resolvedSchema);
    }

    @Override
    public Optional<Long> getSnapshot() {
        return Optional.empty();
    }

    @Override
    public CatalogMaterializedTable getOrigin() {
        return origin;
    }

    @Override
    public ResolvedSchema getResolvedSchema() {
        return resolvedSchema;
    }

    @Override
    public String getDefinitionQuery() {
        return origin.getDefinitionQuery();
    }

    @Override
    public Duration getFreshness() {
        return origin.getFreshness();
    }

    @Override
    public LogicalRefreshMode getLogicalRefreshMode() {
        return origin.getLogicalRefreshMode();
    }

    @Override
    public RefreshMode getRefreshMode() {
        return origin.getRefreshMode();
    }

    @Override
    public RefreshStatus getRefreshStatus() {
        return origin.getRefreshStatus();
    }

    @Override
    public Optional<String> getRefreshHandlerDescription() {
        return origin.getRefreshHandlerDescription();
    }

    @Nullable
    @Override
    public byte[] getSerializedRefreshHandler() {
        return origin.getSerializedRefreshHandler();
    }
}


Configuration

We need to introduce two options to control the background refresh job.

Key

Required

Default

Type

Description

materialized-table.refresh-mode.freshness-threshold

Optional

30 minute

Duration

This is used to derive the refresh mode for the background refresh job that Materialized Table statement generates. If FRESHNESS is less than the threshold, it is running in continuous mode. If it is greater, running in full mode.

partition.fields.#.date-formatter

Optional

None

String

Specify the time partition formatter of partitioned materialized table, # represents the string type partition field. This is used to hint to the framework which partitions to refresh in full refresh mode. It is a common materialized table option that corresponding Catalog should support.


In the Overall Architecture section, we will introduce the SQL gateway that is responsible for managing the background refresh job of the materialized table, so this option is the only scope to SQL gateway. It is read from the flink-conf each time the service starts and is not allowed to be modified. After the service starts, it first looks up all the Materialized Tables, compares materialized-table.refresh-mode.freshness-threshold value to each Materialized Table freshness, and if the refresh mode changes, the original refresh job is stopped, then a new job is created, and the job information is updated to Materialized Table.

Modifying this option may affect the refresh mode of the background refresh jobs of all registered materialized tables in the Catalog, so please modify it carefully. If you really need to modify the job refresh mode, please prioritize to realize it by modifying materialized table freshness.


Gateway Rest API

v3/materialized-tables/refresh

Verb: POST

Response code: 200 OK

Trigger a refresh operation of materialized tables.

Request

body

[

  "materializedTable": "",

  "isPeriodic": true

  "scheduleTime": "",

  "scheduleTimeFormat": "",

  "config":  {

            "key":  "value"

    }

]

Response

body

{

  jobID: "",

  clusterInfo: {}

}

This API is used for workflow scheduler to trigger a refresh operation of materialized table automatically. This is just a draft API, the main purpose of which is to give you an idea of the mechanics of the refresh operation. We will give a detailed design in another FLIP that supports pluggable Workflow Scheduler.

Proposed Design

Integrating the aforementioned context, Materialized Table is a new table type introduced for Flink SQL that enables users to build and manage data pipelines, it is automatically refreshed at specified freshness intervals. This means that the materialized table needs to be bound to a transformation SQL statement, and based on the defined data freshness requirements, a background data refresh piepline will be created to automatically update the data.

So materialized table is composed of three parts:

  1. Metadata: This includes the table's schema, options, definition query, freshness, and refresh handler which points to the refresh job.
  2. Data: This corresponds to a specific storage that holds the current table's data, which can provide full historical data or incremental changelog. 
  3. Background Refresh Pipeline: This pipeline ensures the production and updating of data in either a continuous or full refresh mode.

Overall Architecture

Taking into account the functional design of Materialized Table, the management of its entire lifecycle is primarily supported by the following core components in the architecture:

  • SQL CLI: A user interaction interface responsible for submitting materialized table statements. 
  • SQL Gateway: In charge of parsing materialized table statements, creating tables, and submitting jobs to Flink cluster. Additionally, the workflow scheduler will interact with SQL Gateway to submit batch workflows.
  • Materialized Table Manager: As an embedded manager in SQL Gateway that interacts with Workflow Scheduler through Restful APIs. It is responsible for deriving the data refresh mode for materialized table, deriving streaming job runtime parameters, CRON expressions for batch scheduling jobs, creating batch-scheduled jobs to the scheduler, and more. 
  • Catalog: Storage for dynamic table metadata, which includes schema, options, freshness, definition query, refresh job information, etc. 
  • Pluggable Workflow Scheduler: Interacts bidirectionally with SQL Gateway through Restful APIs, responsible for creating batch-scheduled jobs and triggering their execution. It supports open-source workflow schedulers like Airflow, DolphinScheduler through a pluggable mechanism. 
  • Flink Cluster: Runs Flink Jobs, including both streaming and batch jobs.

As illustrated in the architecture diagram above, the workflow for executing a Materialized Table create statement is as follows:

  1. Parse the create statement, deduce the schema of the dynamic table based on the select query, and generate a CatalogMaterializedTable instance, including schema, table options(if specified), definition query, and freshness.
  2. Create the materialized table in Catalog.
  3. Determine the refresh mode based on defined freshness and materialized-table.refresh-mode.freshness-threshold value, generating the corresponding type background refresh job.
  4. If it is the continuous refresh mode:
    • Compile and submit a Flink streaming job, which starts to run statelessly, the source table data read position defaults to the entire dataset. Then the refresh handler information is updated to CatalogMaterializedTable.
  5. If it is the full refresh mode: 
    • Deduce the scheduling CRON expression via freshness value, create a scheduling workflow in the scheduler, and then update the refresh job information to CatalogMaterializedTable
    • The scheduler periodically triggers the batch scheduling tasks, and then calls the SQL gateway to execute the workflow.
    • When SQL gateway receives a scheduled task, it retrieves the definition query of materialized table from Catalog, calculates the corresponding time partition value (e.g., ds) for partitioned table, and then submits a Flink batch job.

Among these, the most critical component is the Materialized Table Manager, which handles the most essential functions in the execution process of Materialized Table and interacts with other relevant components. Since the implementation details of Materialized Table Manager are not the focus of this FLIP, they are not elaborated on here.

Regarding the details of interfacing with various open-source schedulers, we will discuss the relevant interfaces involved in a separate FLIP. For specific schedulers such as airflow and dolphinscheduler will be implemented and maintained in a separate repository (similar to connector repos).

Partitioned Table Full Refresh

Based on the previous introduction, we understand that in full refresh mode, batch scheduling tasks are created based on freshness, and each execution trigger results in a full table refresh. A common scenario in data warehouses is time-partitioned tables, such as daily partitions keyed by 'ds'. If the user expects only to refresh the latest partition with each execution, how can this be handled? We will introduce the implementation details through the following example.

CREATE MATERIALIZED TABLE dt_sink
PARTITIONED BY (ds)
WITH(
  --hint time partition format
  'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' DAY
AS SELECT 
  	ds
  	id,
  	order_number,
  	user_id,
	...
FROM source

In the given example, both the 'source' and 'dt_sink' tables are partitioned by a 'ds' field. By specifying the format of the time partition field in the WITH clause, the framework is prompted to only refresh the data in the latest 'ds' partition with each batch schedule.

Given the scenario where the freshness is set to '1' DAY, meaning that the scheduling occurs once a day. Assuming today's date is March 2, 2024, when the batch scheduling is triggered at midnight, the framework will use the time passed by the scheduling system, combined with freshness and 'partition.fields.ds.date-formatter', to calculate the 'ds' partition that needs to be refreshed. For this case, it will determine that 'ds=2024-03-01' which corresponds to yesterday's partition. The framework then assembles the following SQL Statement to submit a Fink batch job.

INSERT OVERWRITE dt_sink
SELECT * FROM 
(SELECT 
  	ds
  	id,
	order_number,
	user_id,
	...
  FROM source
) AS tmp
– add partition filter condition
WHERE ds = '2024-03-01'

Since both the 'source' and 'dt_sink' tables are partitioned by the 'ds' field, the optimizer, when generating the execution plan, will push down the 'ds' filter condition to the 'source' table, thereby only reading data from the corresponding partition of the 'source' table.

If FRESHNESS = INTERVAL '1' HOUR, what would the behavior be? First, a task that is scheduled on an hourly basis is registered on the Scheduler. Then, with each execution, the framework will use the time provided by the scheduling system, combined with freshness and the 'partition.fields.ds.date-formatter', to calculate the 'ds' partition that needs to be refreshed. For this scenario, it would compute that ds='2024-03-02', which is the latest time partition.

Compatibility, Deprecation, and Migration Plan

This FLIP only has incremental changes, and thus is fully backward compatible.

Test Plan

Both Unit Tests & Integration Tests & Manual Tests will be introduced to verify this change. 

Rejected Alternatives

Materialized View

Why not use Materialized View? A view is an equivalent of a query result. According to the definition of the SQL standard, it is not supposed to support modifications. However, in the lakehouse scenario, there is a need to modify data, which leads to a conceptual conflict between the two. Therefore, using a table is more appropriate.

Base class of CatalogMaterializedTable 

Regarding CatalogMaterializedTable, another alternative is extending CatalogTable directly instead of CatalogBaseTable. This makes it straightforward to have attributes of CatalogTable.

@PublicEvolving
public interface CatalogMaterializedTable extends CatalogTable {

      /**
     * The definition query text of materialized table. Text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a materialized table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the materialized table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of the materialized table which is used to determine the data refresh mode. */
    Duration getFreshness();      /** Get the logical refresh mode of materialized table. */
    LogicalRefreshMode getLogicalRefreshMode();

    /** Get the physical refresh mode of materialized table. */
    RefreshMode getRefreshMode();

    /** Get the refresh status of materialized table. */
    RefreshStatus getRefreshStatus();

    /** Return summary description of refresh handler. */
    Optional<String> getRefreshHandlerDescription();

    /**
     * Return the serialized refresh handler of materialized table. This will not be used for describe
     * table.
     */
    byte[] getSerializedRefreshHandler();

    /** The logical refresh mode of materialized table. */
    @PublicEvolving
    enum LogicalRefreshMode {
        /**
         * The refresh pipeline will be executed in continuous mode, corresponding to {@link
         * RefreshMode#CONTINUOUS}.
         */
        CONTINUOUS,

        /**
         * The refresh pipeline will be executed in full mode, corresponding to {@link
         * RefreshMode#FULL}.
         */
        FULL,

        /**
         * The refresh pipeline mode is determined by freshness of materialized table, either {@link
         * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}.
         */
        AUTOMATIC
    }

    /** The physical refresh mode of materialized table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }

    /** Background refresh pipeline status of materialized table. */
    @PublicEvolving
    enum RefreshStatus {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    } 
}


There are two main reasons why CatalogTable was not inherited:

  1. Materialized Table is a new table type that also includes attributes such as query, freshness and refresh job, which makes it more appropriate to use the new interface.
  2. If CatalogTable adds a new attribute that is not supported by Materialized Table, it will lead to a conflict, so it is more appropriate to represent it using an interface that is parallel to CatalogTable.

Future Improvements

Temporary View Limitation

Since both the materialized table and the query it references need to be persistent, and the temporary view is executed locally and only effective for the current execution, it cannot be persisted. Therefore, it is not supported. If there is a real need to use temporary view, there are currently two solutions. One is to use a View, and the other is to use a Common Table Expression (CTE)[7], which is equivalent in effect to a temporary view. Of course, future support for temporary view can be determined based on user demand and in conjunction with technical implementation.

CREATE MATERIALIZED TABLE sink
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '3' MINUTE
AS
-- use cte
WITH cte_source AS (
  SELECT 
  user_id,
  shop_id,
  FROM_UNIXTIME(ds, 'yyyy-MM-dd') AS ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
FROM c_source
GROUP BY user_id, shop_id, FROM_UNIXTIME(ds, 'yyyy-MM-dd')
)
SELECT * FROM cte_source

Modify Select Statement Limitation

From the standpoint of strict semantics, the current design of the Materialized Table does not support altering the query statement. In the context of a lakehouse, a common scenario involves the need to add fields due to schema evolution, which is a legitimate requirement that we intend to support in the future. One possible solution we have conceived is to use the syntax Alter Materialized Table xxx ADD query_statement to provide this support. At the framework level, we would optimize by merging multiple query statements that write to the same Materialized Table.

-- create a materialized table with two agg indices
CREATE MATERIALIZED TABLE dws_deals(
	PRIMARY KEY (order_id, ds) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
  o.ds
  o.order_id,
  count(*) order_cnt,
  sum(pay_amt) order_total_amt
FROM 
  dwd_orders as o
GROUP BY o.order_id, o.ds;

-- add two new agg indices statement
ALTER MATERIALIZED TABLE dws_deals
ADD 
SELECT
  o.ds
  o.order_id,
  count(distinct order_id) dedup_order_cnt, -- deduplicated order cnt
  sum(goods_cnt) tatol_goods_cnt -- total goods cnt
FROM 
  dwd_orders as o
GROUP BY o.order_id, o.ds;


Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables

[2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about

[3] https://docs.databricks.com/en/delta-live-tables/index.html

[4] https://hudi.apache.org/docs/overview

[5] https://paimon.apache.org/docs/master

[6] PoC: https://github.com/lsyldliu/flink/tree/dynamic_table_poc

[7] https://www.postgresql.org/docs/16/queries-with.html




  • No labels