Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread

https://lists.apache.org/thread/c1gnn3bvbfs8v1trlf975t327s4rsffs

Vote threadhttps://lists.apache.org/thread/woj27nsmx5xd7p87ryfo8h6gx37n3wlx
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-35187

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

When a Create Materialized Table statement is executed, it deduces derives the schema of the table based on the query statement referenced, first creates a Materialized Table in Catalog, and then automatically creates a data processing pipeline to refresh the table based on the specified Freshness value. Since FRESHNESS is set to 3 minutes here, a Flink Streaming Job is pulled up launched to refresh continuously.

Pause & Resume Data Pipeline

...

If the REFRESH_MODE is specified at the time of table creation, its priority is higher than freshness, which means freshness will no longer be used to infer the refresh mode of the dynamic materialized table.

Full Refresh Mode

In full refresh mode, for materialized table tables, the default overwrite behavior is table granularity. If a materialized table is a partitioned table, and the time partition field format is specified by 'partition.fields.#.date-formatter' when creating the table, the default overwrite behavior is partition granularity, that is, only the latest partition data is overwritten each time. For more details on 'partition.fields.#.date-formatter', see the partitioned table full refresh section.

...

Manually trigger a materialized table refresh, only works for the current table, doesn't trigger a cascade refresh of all downstream tables defaultlydefault.

Manual Refresh command and the background refresh job can run in parallelconcurrent, the framework does not impose any limitations on this. Of course, if the refresh mode is continuous and the background job is running, you should be careful with the refresh command, which can lead to inconsistent data.

...

Modify the time measure that the materialized table’s content should lag behind updates to the base tables. Modifying freshness may affect the refresh mode of the background job, which may change from continuous mode to full mode, or from full mode to continuous mode, or remain unchanged. In any case, alter altering freshness will stop the current refresh job and create a new job with the updated freshness.

Note: If the refresh mode is manually specified by REFRESH_MODE clause when creating a materialized table, modify modifying freshness will not modify affect the refresh mode.

REFRESH_MODE

...

Besides the aforementioned Alter syntax, materialized tables currently do not support other Alter syntaxes that are applicable apply to regular Table.

Drop

Code Block
languagesql
DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name

...

Describe the specified job information in the Flink cluster, the return information schema same as SHOW JOBS statement.

Interface Changes

...

CatalogMaterializedTable

We introduce a new interface CatalogMaterializedTable which extends the CatalogBaseTable. As described above, the CatalogMaterializedTable, in addition to having the attributes of schema, options, and partition keys, also encompasses other important metadata: definition query, freshness, refresh mode, refresh status, and refresh handler.

Code Block
languagejava
/**
 * Represents the unresolved metadata of a materialized table in a {@link Catalog}.
 *
 * <p>Materialized Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogMaterializedTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background refresh pipeline, either through a flink streaming or scheduled batch job, it is
 *       initialized after materialized table is created.
 * </ul>
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, LogicalRefreshMode, RefreshMode, RefreshStatus, String, byte[])} for a basic
 * implementation of this interface or create a custom class that allows passing catalog-specific
 * objects all the way down to the connector creation (if necessary).
*/
@PublicEvolving
public interface CatalogMaterializedTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.MATERIALIZED_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogMaterializedTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogMaterializedTable copy(Map<String, String> options);

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given refresh info.
     *
     * @return a new copy of this table with replaced refresh info
     */     
    CatalogDynamicTableCatalogMaterializedTable copy(
            RefreshStatus refreshStatus,
            String refreshHandlerDescription,
            byte[] serializedRefreshHandler);
      
    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    Optional<Long> getSnapshot();

    /**
     * The definition query text of materialized table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the materialized table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of materialized table which is used to determine the physical refresh mode. */
    Duration getFreshness();

    /** Get the logical refresh mode of materialized table. */
    LogicalRefreshMode getLogicalRefreshMode();

    /** Get the physical refresh mode of materialized table. */
    RefreshMode getRefreshMode();

    /** Get the refresh status of materialized table. */
    RefreshStatus getRefreshStatus();

    /** Return summary description of refresh handler. */
    Optional<String> getRefreshHandlerDescription();

    /**
     * Return the serialized refresh handler of materialized table. This will not be used for describe
     * table.
     */
    byte[] getSerializedRefreshHandler();

    /** The logical refresh mode of materialized table. */
    @PublicEvolving
    enum LogicalRefreshMode {
        /**
         * The refresh pipeline will be executed in continuous mode, corresponding to {@link
         * RefreshMode#CONTINUOUS}.
         */
        CONTINUOUS,

        /**
         * The refresh pipeline will be executed in full mode, corresponding to {@link
         * RefreshMode#FULL}.
         */
        FULL,

        /**
         * The refresh pipeline mode is determined by freshness of materialized table, either {@link
         * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}.
         */
        AUTOMATIC
    }

    /** The physical refresh mode of materialized table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }

    /** Background refresh pipeline status of materialized table. */
    @PublicEvolving
    enum RefreshStatus {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}

...

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create creates a materialized table that represents a physical storage,  which can provide full historical data or incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for the planner to compile a query.

...

RefreshHandler

We introduce the class interface RefreshHandler that which records the meta information of the current materialized table background refresh job.

Code Block
languagejava
/**
 * This interface represents the meta information of current dynamicmaterialized table background refresh
 * pipeline. The refresh mode maybe continuous or full. The format of , the meta information in the
 * two modes is not
 * consistent, so user need to implementation this interface according to business scenario.
 *
 * <p>This meta information will be serialized to bytes by {@link RefreshHandlerSerializer}, then
 * store to Catalog for suspend or drop {@link CatalogDynamicTable}.
 *
 * <p>In continuous mode, the format of the meta information maybe contains { "clusterType": "yarn",
 * "clusterId":
 * "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information formatmaybe maybecontains { "endpoint": "xxx", "workflowId": "yyy" }.
 * Due to youuser may use different workflow scheduler plugin in this mode, youuser should implement this
 * interface according to yourtheir plugin.
 */
@PublicEvolving
public interface RefreshHandler {
 * <p>This interface will be serialized to bytes by {@link RefreshHandlerSerializer}, then store to
 * Catalog for further operation.
 */ 
@PublicEvolving
public interface RefreshHandler {

    /** Returns a string that summarizes this refresh handler meta information for printing to a console or log. */
    String asSummaryString();
}

RefreshHandlerSerializer

Introduce the interface RefreshHandlerSerializer which is used to serialize and deserialize RefreshHandler. The serialized bytes will be persisted to CatalogMaterializedTable for further operation.

Code Block
languagejava
/** This interface is used to serialize and deserialize the {@link RefreshHandler}. */
@PublicEvolving
public interface RefreshHandlerSerializer<T extends RefreshHandler> {

    /** Serialize the {@link RefreshHandler} to bytes. */
    byte[] serialize(T refreshHandler) throws IOException;

    /** Returns Deserialize the bytes to a string that summarizes this refresh handler meta information for printing to a console or log. */
    String asSummaryString() {@link RefreshHandler} instance. */
    T deserialize(byte[] serializedBytes, ClassLoader cl)
            throws IOException, ClassNotFoundException;
}

ResolvedCatalogMaterializedTable

...

Key

Required

Default

Type

Description

materialized-table.refresh-mode.freshness-threshold

Optional

30 minute

Duration

This is used to derive the refresh mode for the background refresh job that Materialized Table statement generates. If FRESHNESS is less than the threshold, it is running in continuous mode. If it is greater, running in full mode.

partition.fields.#.date-formatter

Optional

None

String

Specify the time partition formatter of partitioned materialized table, # represents the string type partition field. This is used to hint to the framework which partition partitions to refresh in full refresh mode. It is a common materialize materialized table option that correspoding corresponding Catalog should support.


In the Overall Architecture section, we will introduce the SQL gateway that is responsible for managing the background refresh job of the materialized table, so this option is the only scope to SQL gateway. It is read from the flink-conf each time the service starts , and is not allowed to be modified. After the service starts, it first looks up all the Materialized Tables, compares materialized-table.refresh-mode.freshness-threshold value to each Materialized Table freshness, and if the refresh mode changes, the original refresh job is stopped, then a new job is created, and the job information is updated to Materialized Table.

...

This API is used for workflow scheduler to trigger a refresh operation of materialized table aumaticallyautomatically. This is just a draft API, the main purpose of which is to give you an idea of the mechanics of the refresh operation. We will give a detailed design in another FLIP that supports pluggable Workflow Scheduler.

...

  1. Metadata: This includes the table's schema, options, definition query, freshness, and refresh handler which points to the refresh job.
  2. Data: This corresponds to a specific storage that holds the current table's data, which can provide full historical data or incremental changelog. 
  3. Background Refresh Pipeline: This pipeline ensures the production and updating of data in either a continuous or full refresh mode.

...

  • SQL CLI: A user interaction interface responsible for submitting materialized table statements. 
  • SQL Gateway: In charge of parsing materialized table statements, creating tables, and submitting jobs to flink Flink cluster. Additionally, the workflow scheduler will interact with SQL Gateway to submit batch workflows.
  • Materialized Table Manager: As an embedded manager in SQL Gateway that interacts with Workflow Scheduler through Restful APIs. It is responsible for deducing deriving the data refresh mode for materialized table, deriving streaming job runtime parameters, CRON expressions for batch scheduling jobs, creating batch-scheduled jobs to the scheduler, and more. 
  • Catalog: Storage for dynamic table metadata, which includes schema, options, freshness, definition query, refresh job information, etc. 
  • Pluggable Workflow Scheduler: Interacts bidirectionally with SQL Gateway through Restful APIs, responsible for creating batch-scheduled jobs and triggering their execution. It supports open-source workflow schedulers like Airflow, DolphinScheduler through a pluggable mechanism. 
  • Flink Cluster: Runs Flink Jobs, including both streaming and batch jobs.

...

  1. Parse the create statement, deduce the schema of the dynamic table based on the select query, and generate a CatalogMaterializedTable instance, including schema, table options(if specified), definition query, and freshness.
  2. Create the materialized table in Catalog.
  3. Determine the refresh mode based on defined freshness and materialized-table.refresh-mode.freshness-threshold value, generating the corresponding type background refresh job.
  4. If it is the continuous refreshmode:
    • Compile and submit a Flink streaming job, which starts to run statelessly, the source table data read position defaults to the entire dataset. Then the refresh handler information is updated to CatalogMaterializedTable.
  5. If it is the full refreshmode: 
    • Deduce the scheduling CRONexpression via freshness value, create a scheduling workflow in the scheduler, and then update the refresh job information to CatalogMaterializedTable
    • The scheduler periodically triggers the batch scheduling tasks, and then call calls the SQL gateway to execute the workflow.
    • When SQL gateway receives a scheduled task, it retrieves the definition query of materialized table from Catalog, calculates the corresponding time partition value (e.g., ds) for partitioned table,   and then submits a Flink batch job.

...

Regarding the details of interfacing with various open-source schedulers, we will discuss the relevant interfaces involved in a separate FLIP. For specific schedulers such as airflow and dolphinscheduler will be implemented and maintained in a separate repository (similar to connector repos).

...

If FRESHNESS = INTERVAL '1' HOUR, what would the behavior be? First, a task that schedules is scheduled on an hourly basis is registered on the Scheduler. Then, with each execution, the framework will use the time provided by the scheduling system, combined with freshness and the 'partition.fields.ds.date-formatter', to calculate the 'ds' partition that needs to be refreshed. For this scenario, it would compute that ds='2024-03-02', which is the latest time partition.

...

This FLIP only has incremental changes, and thus is fully backwards backward compatible.

Test Plan

Both Unit Test Tests & Integration Test Tests & Manual Test Tests will be introduced to verify this change. 

...