Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

From the above operational process, it can be seen that the use of Materialized Table can greatly simplify the development of the data processing pipeline. By using a unified  unified declarative SQL API to unify the stream and batch pipelines, users do not need to be concerned with Flink job and execution mode, they simply need to operate with the materialized table.

...

Code Block
languagejava
/**
 * Represents the unresolved metadata of a materialized table in a {@link Catalog}.
 *
 * <p>Materialized Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogMaterializedTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background refresh pipeline, either through a flink streaming or scheduled batch job, it is
 *       initialized after materialized table is created.
 * </ul>
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, LogicalRefreshMode, RefreshMode, RefreshStatus, String, byte[])} for a basic
 * implementation of this interface or create a custom class that allows passing catalog-specific
 * objects all the way down to the connector creation (if necessary).
*/
@PublicEvolving
public interface CatalogMaterializedTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.DYNAMICMATERIALIZED_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogMaterializedTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogMaterializedTable copy(Map<String, String> options);

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given refresh info.
     *
     * @return a new copy of this table with replaced refresh info
     */
    CatalogDynamicTable copy(
            RefreshStatus refreshStatus,
            String refreshHandlerDescription,
            byte[] serializedRefreshHandler);
      
    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    Optional<Long> getSnapshot();

    /**
     * The definition query text of materialized table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the materialized table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of materialized table which is used to determine the physical refresh mode. */
    Duration getFreshness();

    /** Get the logical refresh mode of materialized table. */
    LogicalRefreshMode getLogicalRefreshMode();

    /** Get the physical refresh mode of materialized table. */
    RefreshMode getRefreshMode();

    /** Get the refresh status of materialized table. */
    RefreshStatus getRefreshStatus();

    /** Return summary description of refresh handler. */
    Optional<String> getRefreshHandlerDescription();

    /**
     * Return the serialized refresh handler of materialized table. This will not be used for describe
     * table.
     */
    byte[] getSerializedRefreshHandler();

    /** The logical refresh mode of materialized table. */
    @PublicEvolving
    enum LogicalRefreshMode {
        /**
         * The refresh pipeline will be executed in continuous mode, corresponding to {@link
         * RefreshMode#CONTINUOUS}.
         */
        CONTINUOUS,

        /**
         * The refresh pipeline will be executed in full mode, corresponding to {@link
         * RefreshMode#FULL}.
         */
        FULL,

        /**
         * The refresh pipeline mode is determined by freshness of materialized table, either {@link
         * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}.
         */
        AUTOMATIC
    }

    /** The physical refresh mode of materialized table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }

    /** Background refresh pipeline status of materialized table. */
    @PublicEvolving
    enum RefreshStatus {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}

...

Integrating the aforementioned context, Materialized Table is a new table type introduced for Flink SQL that enables users to build and manage data pipelines, it is automatically refreshed at specified freshness intervals. This means that the materialized table needs to be bound to a transformation SQL statement, and based on the defined data freshness requirements, a background data refresh piepline will be created to automatically update the data.

Image RemovedImage Added

So materialized table is composed of three parts:

  1. Metadata: This includes the table's schema, options, definition query, freshness and refresh piepline informationrefresh handler which points to refresh job.
  2. Data: This corresponds to a specific storage that holds the current table's data, which can provide full historical data or incremental changelog. 
  3. Background Refresh Pipeline: This job pipeline ensures the production and updating of data in either a continuous or full refresh mode.

...

  • SQL CLI: A user interaction interface responsible for submitting materialized table statements. 
  • SQL Gateway: In charge of parsing materialized table statements, creating tables, submitting jobs to flink cluster. Additionally, the workflow scheduler will interact with SQL Gateway to submit batch workflows.
  • Materialized Table Manager: As an embedded manager in SQL Gateway that interacts with Workflow Scheduler through Restful APIs. It is responsible for deducing the data refresh mode for materialized table, deriving streaming job runtime parameters, CRON expressions for batch scheduling jobs, creating batch-scheduled jobs to scheduler, and more. 
  • Catalog: Storage for dynamic table metadata, which includes schema, options, freshness, definition query, refresh job information, etc. 
  • Pluggable Workflow Scheduler: Interacts bidirectionally with SQL Gateway through Restful APIs, responsible for creating batch-scheduled jobs and triggering their execution. It supports open-source workflow schedulers like Airflow, DolphinScheduler through a pluggable mechanism. 
  • Flink Cluster: Runs Flink Jobs, including both streaming and batch jobs.

Image RemovedImage Added

As illustrated in the architecture diagram above, the workflow for executing a Materialized Table create statement is as follows:

...