Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**

  * Represents the unresolved metadata of a dynamic table in a {@link Catalog}.

  *

  * <p>Dynamic Table definition: In the context of integrated stream-batch data storage, it provides

  * full history data and incremental changelog. By defining the data's production business logic and

  * freshness, data update is achieved through continuous or full refresh mode, while also possessing

  * the capability for both batch and incremental consumption.

  *

  * <p>The metadata for {@link CatalogDynamicTable} also includes the following four main parts:

  *

  * <ul>

  *    <li>Schema, comments, options and partition keys.

  *    <li>Data freshness, which determines when the data is generated and becomes visible for the user.

  *    <li>Data production business logic, also known as the definition query.

  *    <li>Background data refresh job, either through a flink streaming or scheduled batch job, it is

  *          initialized after the dynamic table is created.

  * </ul>

  *

  * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,

  * String, RefreshMode, RefreshJobHandler)} for a basic implementation of this interface or create a

  * custom class that allows passing catalog-specific objects all the way down to the connector

  * creation (if necessary).

  */

@PublicEvolving

public interface CatalogDynamicTable extends CatalogBaseTable {

        /**

          * Creates an instance of {@link CatalogDynamicTable}.

          *

          * @param schema unresolved schema

          * @param comment optional comment

          * @param partitionKeys list of partition keys or an empty list if not partitioned

          * @param options options to configure the connector

          * @param snapshot table snapshot of the table

          * @param definitionQuery definition sql query

          * @param freshness data freshness

          * @param refreshMode data refresh mode

          * @param refreshJobHandler refresh job handler

          */

    static    static CatalogDynamicTable of(

            Schema schema,

            @Nullable String comment,

            List<String> partitionKeys,

            Map<String            Schema schema,
            @Nullable String comment,
            List<String> partitionKeys,
            Map<String, String> options,

            @Nullable            @Nullable Long snapshot,

            String definitionQuery,

            String freshness,

            RefreshMode refreshMode,

            RefreshJobHandler refreshJobHandler) {

        return new DefaultCatalogDynamicTable(

                schema,

                comment,

                partitionKeys,

                options,

                snapshot,

                definitionQuery,

                freshness,

                refreshMode,

                refreshJobHandler);

    }

    @Override

    default TableKind getTableKind() {

        return TableKind.DYNAMIC_TABLE;

    }

    /**

     * Check if the table is partitioned or not.

     *

     * @return true if the table is partitioned; otherwise, false

     */

    boolean isPartitioned();

    /**

     * Get the partition keys of the table. This will be an empty set if the table is not

     * partitioned.

     *

     * @return partition keys of the table

     */

    List<String> getPartitionKeys();

    /**

     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.

     *

     * @return a new copy of this table with replaced table options

     */

    CatalogDynamicTable copy(Map<String, String> options);

    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */

    default Optional<Long> getSnapshot() {

        return Optional.empty();

    }

    /**

     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.

     * This is needed because the context such as current DB is lost after the session, in which

     * view is defined, is gone. Expanded query text takes care of this, as an example.

     *

     * <p>For example, for a dynamic table that is defined in the context of "default" database with

     * a query {@code select * from test1}, the expanded query text might become {@code select

     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in

     * database "default" and has two columns ("name" and "value").

     *

     * @return the dynamic table definition in expanded text.

     */

    String getDefinitionQuery();

    /** Get the freshness of the dynamic table which is used to determine the data refresh mode. */

    String getFreshness();

    /** Get the refresh mode of the dynamic table. */

    RefreshMode getRefreshMode();

    /** Get the refresh job handler of the dynamic table. */

    RefreshJobHandler getRefreshJobHandler();

    /** The refresh mode of the dynamic table. */

    @PublicEvolving

    enum RefreshMode {

        CONTINUOUS,

        FULL

    }

}

Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create dynamic table represents a streaming-batch unified storage,  which can provide full historical data and incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

@PublicEvolving

public interface Catalog {

        /**

     * Returns a factory for creating instances from catalog objects.

     *

     * <p>This method enables bypassing the discovery process. Implementers can directly pass

     * internal catalog-specific objects to their own factory. For example, a custom {@link

     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.

     *

     * <p>If this catalog support to create dynamic table, you should also override this method to

     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and

     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override

     * this method, you must specify the physical connector identifier that this catalog represents

     * storage when creating a dynamic table. Otherwise, the planner can't find the {@link

     * DynamicTableFactory}.

     *

     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement

     * multiple supported extension points. An {@code instanceof} check is performed by the caller

     * that checks whether a required factory is implemented; otherwise the discovery process is

     * used.

     */

    default Optional<Factory> getFactory() {

        return Optional.empty();

    }

}

RefreshJobHandler

We introduce the class RefreshJobHandler that records the meta information of the current dynamic table background refresh job.

/**

 * This interface represents the meta information of the current dynamic table background refresh job.

 * The refresh mode of the background job may be continuous or full. The format of the meta information

 * in the two modes is not consistent, so we unify it by a structured json string jobDetail.

 *

 * <p>In continuous mode, the format of the meta information is { "clusterType": "yarn",

 * "clusterId": "xxx", "jobId": "yyyy" }.

 *

 * <p>In full mode, the meta information format is { "schedulerType": "airflow", "endpoint": "xxx",

 * "workflowId": "yyy" }.

 */

@PublicEvolving

public class RefreshJobHandler {

    private final CatalogDynamicTable.RefreshMode actualRefreshMode;

    private final JobState jobState;

    private final @Nullable String jobDetail;

    public RefreshJobHandler(

            CatalogDynamicTable.RefreshMode actualRefreshMode,

            JobState jobState,

            @Nullable String jobDetail) {

        this.actualRefreshMode = actualRefreshMode;

        this.jobState = jobState;

        this.jobDetail = jobDetail;

    }

    public CatalogDynamicTable.RefreshMode getActualRefreshMode() {

        return actualRefreshMode;

    }

    public JobState getJobState() {

        return jobState;

    }

    public String getJobDetail() {

        return jobDetail;

    }

    @Override

    public boolean equals(Object o) {

        if (this == o) {

            return true;

        }

        if (o == null || getClass() != o.getClass()) {

            return false;

        }

        RefreshJobHandler that = (RefreshJobHandler) o;

        return actualRefreshMode == that.actualRefreshMode

                && jobState == that.jobState

                && Objects.equals(jobDetail, that.jobDetail);

    }

    @Override

    public int hashCode() {

        return Objects.hash(actualRefreshMode, jobState, jobDetail);

    }

    @Override

    public String toString() {

        return "RefreshJobHandler{"

                + "actualRefreshMode="

                + actualRefreshMode

                + ", jobState="

                + jobState

                + ", jobDetail='"

                + jobDetail

                + '\''

                + '}';

    }

    /** Background refresh job state. */

    @PublicEvolving

    public enum JobState {

        INITIALIZING,

        RUNNING,

        SUSPENDED

    }

}

ResolvedCatalogDynamicTable

We introduce the public class ResolvedCatalogDynamicTable that represents a validated CatalogDynamicTable.

/**

 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the

 * {@link Catalog} but resolved by the framework.

 *

 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner

 * optimize the query phase.

 */

@PublicEvolving

public class ResolvedCatalogDynamicTable

        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {

        this.origin =

                Preconditions.checkNotNull(

                        origin, "Original catalog dynamic table must not be null.");

        this.resolvedSchema =

                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");

    }

    @Override

    public Map<String, String> getOptions() {

        return origin.getOptions();

    }

    @Override

    public String getComment() {

        return origin.getComment();

    }

    @Override

    public CatalogBaseTable copy() {

        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);

    }

    @Override

    public Optional<String> getDescription() {

        return origin.getDescription();

    }

    @Override

    public Optional<String> getDetailedDescription() {

        return origin.getDetailedDescription();

    }

    @Override

    public boolean isPartitioned() {

        return origin.isPartitioned();

    }

    @Override

    public List<String> getPartitionKeys() {

        return origin.getPartitionKeys();

    }

    @Override

    public ResolvedCatalogDynamicTable copy(Map<String, String> options) {

        return new ResolvedCatalogDynamicTable(origin.copy(options), resolvedSchema);

    }

    @Override

    public CatalogDynamicTable getOrigin() {

        return origin;

    }

    @Override

    public ResolvedSchema getResolvedSchema() {

        return resolvedSchema;

    }

    @Override

    public String getDefinitionQuery() {

        return origin.getDefinitionQuery();

    }

    @Override

    public String getFreshness() {

        return origin.getFreshness();

    }

    @Override

    public RefreshMode getRefreshMode() {

        return origin.getRefreshMode();

    }

    @Override

    public RefreshJobHandler getRefreshJobHandler() {

        return origin.getRefreshJobHandler();

    }

}

Configuration

We need to introduce two options to control the background refresh job.

Key

Required

Default

Type

Description

dynamic.table.refresh-mode.freshness-threshold

Optional

30 minute

Duration

This is used to derive the refresh mode for the background refresh job that Dynamic Table statement generates. If FRESHNESS is less than the threshold, it is running in continuous mode. If it is greater, running in full mode.

partition.fields.#.date-formatter

Optional

None

String

Specify the time partition formatter of partitioned dynamic table, # represents the string type partition field. This is used to hint to the framework which partition to refresh in full refresh mode.

In the Overall Architecture section, we will introduce the SQL gateway that is responsible for managing the background refresh job of the dynamic table, so this option is the only scope to SQL gateway. It is read from the flink-conf each time the service starts, and is not allowed to be modified. After the service starts, it first looks up all the Dynamic Tables, compares dynamic.table.refresh-mode.freshness-threshold value to each Dynamic Table freshness, and if the refresh mode changes, the original refresh job is stopped, then a new job is created, and the job information is updated to Dynamic Table.

Modifying this option may affect the refresh mode of the background refresh jobs of all registered dynamic tables in the Catalog, so please modify it carefully. If you really need to modify the job refresh mode, please prioritize to realize it by modifying dynamic table freshness.

Gateway Rest API

v3/dynamic-tables/refresh

Verb: POST

Response code: 200 OK

Trigger a refresh operation of dynamic tables.

Request

body

{

  "tables": [],

  "scheduleTime": "",

  "configuration" {}

}

Response

body

{

  jobId: "",

  clusterInfo: {}

}

This API is used for workflow scheduler to trigger a refresh operation of dynamic table aumatically.

Proposed Design

Integrating the aforementioned context, Dynamic Table is a new table type introduced for Flink SQL that enables users to build and manage data pipelines, it is automatically refreshed at specified freshness intervals. This means that the dynamic table needs to be bound to a transformation SQL statement, and based on the defined data freshness requirements, a background data refresh job will be created to automatically update the data.

Image Removed

So dynamic table is composed of three parts:

  1. Metadata: This includes the table's schema, options, definition query, freshness and refresh job information.
  2. Data: This corresponds to a specific streaming-batch unified storage that holds the current table's data, which can provide full historical data and incremental changelog. 
  3. Background Refresh Job: This job ensures the production and updating of data in either a continuous or full refresh way.

Overall Architecture

Taking into account the functional design of Dynamic Table, the management of its entire lifecycle is primarily supported by the following core components in the architecture:

  • SQL CLI: A user interaction interface responsible for submitting dynamic table statements. 
  • SQL Gateway: In charge of parsing dynamic table statements, creating tables, submitting jobs to flink cluster. Additionally, the workflow scheduler will interact with SQL Gateway to submit batch workflows.
  • Dynamic Table Manager: As an embedded manager in SQL Gateway that interacts with Workflow Scheduler through Restful APIs. It is responsible for deducing the data refresh mode for dynamic table, deriving streaming job runtime parameters, CRON expressions for batch scheduling jobs, creating batch-scheduled jobs to scheduler, and more. 
  • Catalog: Storage for dynamic table metadata, which includes schema, options, freshness, definition query, refresh job information, etc. 
  • Pluggable Workflow Scheduler: Interacts bidirectionally with SQL Gateway through Restful APIs, responsible for creating batch-scheduled jobs and triggering their execution. It supports open-source workflow schedulers like Airflow, DolphinScheduler through a pluggable mechanism. 
  • Flink Cluster: Runs Flink Jobs, including both streaming and batch jobs.

Image Removed

As illustrated in the architecture diagram above, the workflow for executing a Dynamic Table create statement is as follows:

...

  • Compile and submit a Flink streaming job, which starts to run statelessly, the source table data read position defaults to the entire dataset. Then the refresh job information is updated to CatalogDynamicTable.

...

Among these, the most critical component is the Dynamic Table Manager, which handles the most essential functions in the execution process of Dynamic Table and interacts with other relevant components. Since the implementation details of Dynamic Table Manager are not the focus of this FLIP, they are not elaborated on here.

Regarding the details of interfacing with various open source schedulers, we will discuss the relevant interfaces involved in a separate flip. For specific schedulers such as airflow and dolphinscheduler will be implemented and maintained in a separate repository (similar to connector repos).

Partitioned Table Full Refresh

Based on the previous introduction, we understand that in full refresh mode, batch scheduling tasks are created based on freshness, and each execution trigger results in a full table refresh. A common scenario in data warehouses is time-partitioned tables, such as daily partitions keyed by 'ds'. If the user expects only to refresh the latest partition with each execution, how can this be handled? We will introduce the implementation details through the following example.

Code Block
languagesql
CREATE DYNAMIC TABLE dt_sink

PARTITIONED BY (ds)

WITH(

  --hint time partition format

  'partition.fields.ds.time-formatter' = 'yyyy-MM-dd'

)

FRESHNESS = INTERVAL '1' DAY

AS SELECT 

  ds

  id,

  order_number,

  user_id,

...

FROM source

In the given example, both the 'source' and 'dt_sink' tables are partitioned by a 'ds' field. By specifying the format of the time partition field in the WITH clause, the framework is prompted to only refresh the data in the latest 'ds' partition with each batch schedule.

Given the scenario where the freshness is set to '1' DAY, meaning that the scheduling occurs once a day. Assuming today's date is March 2, 2024, when the batch scheduling is triggered at midnight, the framework will use the time passed by the scheduling system, combined with freshness and 'partition.fields.ds.time-formatter', to calculate the 'ds' partition that needs to be refreshed. For this case, it will determine that 'ds=2024-03-01' which corresponds to yesterday's partition. The framework then assembles the following SQL Statement to submit a Fink batch job.

Code Block
languagesql
INSERT OVERWRITE dt_sink
SELECT * FROM 
(SELECT 
  	ds
  	id,
	order_number,
	user_id,
	...
  FROM source
) AS tmp
– add partition filter condition
WHERE ds = '2024-03-01'

Since both the 'source' and 'dt_sink' tables are partitioned by the 'ds' field, the optimizer, when generating the execution plan, will push down the 'ds' filter condition to the 'source' table, thereby only reading data from the corresponding partition of the 'source' table.

If FRESHNESS = INTERVAL '1' HOUR, what would the behavior be? First, a task that schedules on an hourly basis is registered on the Scheduler. Then, with each execution, the framework will use the time provided by the scheduling system, combined with freshness and the 'partition.fields.ds.time-formatter', to calculate the 'ds' partition that needs to be refreshed. For this scenario, it would compute that ds='2024-03-02', which is the latest time partition.

Compatibility, Deprecation, and Migration Plan

This FLIP only has incremental changes, and thus is fully backwards compatible.

Test Plan

Both Unit Test & Integration Test & Manual Test will be introduced to verify this change. 

Rejected Alternatives

Materialized View

Why not use Materialized View? A view is an equivalent of a query result. According to the definition of the SQL standard, it is not supposed to support modifications. However, in the lakehouse scenario, there is a need to modify data, which leads to a conceptual conflict between the two. Therefore, using a table is more appropriate.

Base class of CatalogDynamicTable 

Regarding CatalogDynamicTable, another alternative is extending CatalogTable directly instead of CatalogBaseTable. This makes it straightforward to have attributes of CatalogTable.

@PublicEvolving

public interface CatalogDynamicTable extends CatalogTable {

      /**

     * Creates an instance of {@link CatalogDynamicTable}.

     *

     * @param schema unresolved schema

     * @param comment optional comment

     * @param partitionKeys list of partition keys or an empty list if not partitioned

     * @param options options to configure the connector

     * @param snapshot table snapshot of the table

     * @param definitionQuery definition sql query

     * @param freshness data freshness

     * @param refreshMode data refresh mode

     * @param refreshJobInfo refresh job handler

     */

    static CatalogDynamicTable of(

            Schema schema,

            @Nullable String comment,

            List<String> partitionKeys,

            Map<String, String> options,

            @Nullable Long snapshot,

            String definitionQuery,

            String freshness,

            RefreshMode refreshMode,

            RefreshJobHandler refreshJobHandler) {

        return new DefaultCatalogDynamicTable(

                schema,

                comment,

                partitionKeys,

                options,

                snapshot,

                definitionQuery,

                freshness,

                refreshMode,

                refreshJobHandler);

    }

      /**

     * The definition query text of dynamic table. Text is expanded in contrast to the original SQL.

     * This is needed because the context such as current DB is lost after the session, in which

     * view is defined, is gone. Expanded query text takes care of this, as an example.

     *

     * <p>For example, for a dynamic table that is defined in the context of "default" database with

     * a query {@code select * from test1}, the expanded query text might become {@code select

     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in

     * database "default" and has two columns ("name" and "value").

     *

     * @return the dynamic table definition in expanded text.

     */

    String getDefinitionQuery();

    /** Get the freshness of the dynamic table which is used to determine the data refresh mode. */

    String getFreshness();

    /** Get the refresh mode of the dynamic table. */

    RefreshMode getRefreshMode();

    /** Get the refresh job handler of the dynamic table. */

    RefreshJobHandler getRefreshJobHandler();

    /** The refresh mode of the dynamic table. */

    @PublicEvolving

    enum RefreshMode {

        /**

         * This mode is derived by freshness when a dynamic table is created and not manually specified.

         */

        CONTINUOUS,

        FULL,

    }

            String definitionQuery,
            String freshness,
            RefreshMode refreshMode,
            RefreshJobHandler refreshJobHandler) {
        return new DefaultCatalogDynamicTable(
                schema,
                comment,
                partitionKeys,
                options,
                snapshot,
                definitionQuery,
                freshness,
                refreshMode,
                refreshJobHandler);
    }

    @Override
    default TableKind getTableKind() {
        return TableKind.DYNAMIC_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogDynamicTable copy(Map<String, String> options);

    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();
    }

    /**
     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of dynamic table which is used to determine the data refresh mode. */
    String getFreshness();

    /** Get the refresh mode of dynamic table. */
    RefreshMode getRefreshMode();

    /** Get the refresh job handler of dynamic table. */
    RefreshJobHandler getRefreshJobHandler();

    /** The refresh mode of dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }
}


Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create dynamic table represents a streaming-batch unified storage,  which can provide full historical data and incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

Code Block
languagejava
titleCatalog
@PublicEvolving
public interface Catalog {
    
    /**
     * Returns a factory for creating instances from catalog objects.
     *
     * <p>This method enables bypassing the discovery process. Implementers can directly pass
     * internal catalog-specific objects to their own factory. For example, a custom {@link
     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
     *
     * <p>If this catalog support to create dynamic table, you should also override this method to
     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and
     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override
     * this method, you must specify the physical connector identifier that this catalog represents
     * storage when create dynamic table. Otherwise, the planner can't find the {@link
     * DynamicTableFactory}.
     *
     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement
     * multiple supported extension points. An {@code instanceof} check is performed by the caller
     * that checks whether a required factory is implemented; otherwise the discovery process is
     * used.
     */
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }
}


RefreshJobHandler

We introduce the class RefreshJobHandler that records the meta information of the current dynamic table background refresh job.

Code Block
languagejava
/**
 * This interface represents the meta information of current dynamic table background refresh job.
 * The refresh mode of background job maybe continuous or full. The format of the meta information
 * in the two modes is not consistent, so we unify it by a structured json string jobDetail.
 *
 * <p>In continuous mode, the format of the meta information is { "clusterType": "yarn",
 * "clusterId": "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information format is { "schedulerType": "airflow", "endpoint": "xxx",
 * "workflowId": "yyy" }.
 */
@PublicEvolving
public class RefreshJobHandler {

    private final CatalogDynamicTable.RefreshMode actualRefreshMode;
    private final JobState jobState;
    private final @Nullable String jobDetail;

    public RefreshJobHandler(
            CatalogDynamicTable.RefreshMode actualRefreshMode,
            JobState jobState,
            @Nullable String jobDetail) {
        this.actualRefreshMode = actualRefreshMode;
        this.jobState = jobState;
        this.jobDetail = jobDetail;
    }

    public CatalogDynamicTable.RefreshMode getActualRefreshMode() {
        return actualRefreshMode;
    }

    public JobState getJobState() {
        return jobState;
    }

    public String getJobDetail() {
        return jobDetail;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        RefreshJobHandler that = (RefreshJobHandler) o;
        return actualRefreshMode == that.actualRefreshMode
                && jobState == that.jobState
                && Objects.equals(jobDetail, that.jobDetail);
    }

    @Override
    public int hashCode() {
        return Objects.hash(actualRefreshMode, jobState, jobDetail);
    }

    @Override
    public String toString() {
        return "RefreshJobHandler{"
                + "actualRefreshMode="
                + actualRefreshMode
                + ", jobState="
                + jobState
                + ", jobDetail='"
                + jobDetail
                + '\''
                + '}';
    }

    /** Background refresh job state. */
    @PublicEvolving
    public enum JobState {
        INITIALIZING,
        RUNNING,
        SUSPENDED
    }
}


ResolvedCatalogDynamicTable

We introduce the public class ResolvedCatalogDynamicTable that represents a validated CatalogDynamicTable.

Code Block
languagejava
/**
 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogDynamicTable
        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog dynamic table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptions() {
        return origin.getOptions();
    }

    @Override
    public String getComment() {
        return origin.getComment();
    }

    @Override
    public CatalogBaseTable copy() {
        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);
    }

    @Override
    public Optional<String> getDescription() {
        return origin.getDescription();
    }

    @Override
    public Optional<String> getDetailedDescription() {
        return origin.getDetailedDescription();
    }

    @Override
    public boolean isPartitioned() {
        return origin.isPartitioned();
    }

    @Override
    public List<String> getPartitionKeys() {
        return origin.getPartitionKeys();
    }

    @Override
    public ResolvedCatalogDynamicTable copy(Map<String, String> options) {
        return new ResolvedCatalogDynamicTable(origin.copy(options), resolvedSchema);
    }

    @Override
    public CatalogDynamicTable getOrigin() {
        return origin;
    }

    @Override
    public ResolvedSchema getResolvedSchema() {
        return resolvedSchema;
    }

    @Override
    public String getDefinitionQuery() {
        return origin.getDefinitionQuery();
    }

    @Override
    public String getFreshness() {
        return origin.getFreshness();
    }

    @Override
    public RefreshMode getRefreshMode() {
        return origin.getRefreshMode();
    }

    @Override
    public RefreshJobHandler getRefreshJobHandler() {
        return origin.getRefreshJobHandler();
    }
}


Configuration

We need to introduce two options to control the background refresh job.

Key

Required

Default

Type

Description

dynamic.table.refresh-mode.freshness-threshold

Optional

30 minute

Duration

This is used to derive the refresh mode for the background refresh job that Dynamic Table statement generates. If FRESHNESS is less than the threshold, it is running in continuous mode. If it is greater, running in full mode.

partition.fields.#.date-formatter

Optional

None

String

Specify the time partition formatter of partitioned dynamic table, # represents the string type partition field. This is used to hint to the framework which partition to refresh in full refresh mode.


In the Overall Architecture section, we will introduce the SQL gateway that is responsible for managing the background refresh job of the dynamic table, so this option is the only scope to SQL gateway. It is read from the flink-conf each time the service starts, and is not allowed to be modified. After the service starts, it first looks up all the Dynamic Tables, compares dynamic.table.refresh-mode.freshness-threshold value to each Dynamic Table freshness, and if the refresh mode changes, the original refresh job is stopped, then a new job is created, and the job information is updated to Dynamic Table.

Modifying this option may affect the refresh mode of the background refresh jobs of all registered dynamic tables in the Catalog, so please modify it carefully. If you really need to modify the job refresh mode, please prioritize to realize it by modifying dynamic table freshness.


Gateway Rest API

v3/dynamic-tables/refresh

Verb: POST

Response code: 200 OK

Trigger a refresh operation of dynamic tables.

Request

body

{

  "tables": [],

  "scheduleTime": "",

  "configuration" {}

}

Response

body

{

  jobId: "",

  clusterInfo: {}

}

This API is used for workflow scheduler to trigger a refresh operation of dynamic table aumatically.

Proposed Design

Integrating the aforementioned context, Dynamic Table is a new table type introduced for Flink SQL that enables users to build and manage data pipelines, it is automatically refreshed at specified freshness intervals. This means that the dynamic table needs to be bound to a transformation SQL statement, and based on the defined data freshness requirements, a background data refresh job will be created to automatically update the data.

Image Added

So dynamic table is composed of three parts:

  1. Metadata: This includes the table's schema, options, definition query, freshness and refresh job information.
  2. Data: This corresponds to a specific streaming-batch unified storage that holds the current table's data, which can provide full historical data and incremental changelog. 
  3. Background Refresh Job: This job ensures the production and updating of data in either a continuous or full refresh way.

Overall Architecture

Taking into account the functional design of Dynamic Table, the management of its entire lifecycle is primarily supported by the following core components in the architecture:

  • SQL CLI: A user interaction interface responsible for submitting dynamic table statements. 
  • SQL Gateway: In charge of parsing dynamic table statements, creating tables, submitting jobs to flink cluster. Additionally, the workflow scheduler will interact with SQL Gateway to submit batch workflows.
  • Dynamic Table Manager: As an embedded manager in SQL Gateway that interacts with Workflow Scheduler through Restful APIs. It is responsible for deducing the data refresh mode for dynamic table, deriving streaming job runtime parameters, CRON expressions for batch scheduling jobs, creating batch-scheduled jobs to scheduler, and more. 
  • Catalog: Storage for dynamic table metadata, which includes schema, options, freshness, definition query, refresh job information, etc. 
  • Pluggable Workflow Scheduler: Interacts bidirectionally with SQL Gateway through Restful APIs, responsible for creating batch-scheduled jobs and triggering their execution. It supports open-source workflow schedulers like Airflow, DolphinScheduler through a pluggable mechanism. 
  • Flink Cluster: Runs Flink Jobs, including both streaming and batch jobs.

Image Added

As illustrated in the architecture diagram above, the workflow for executing a Dynamic Table create statement is as follows:

  1. Parse the create statement, deduce the schema of the dynamic table based on the select query, and generate a CatalogDynamicTable instance, including schema, table options(if specified), definition query and freshness.
  2. Create the dynamic table in Catalog.
  3. Determine the refresh mode based on defined freshness and dynamic.table.refresh-mode.freshness-threshold value, generating the corresponding type background refresh job.
  4. If it is the continuous refreshmode:
    • Compile and submit a Flink streaming job, which starts to run statelessly, the source table data read position defaults to the entire dataset. Then the refresh job information is updated to CatalogDynamicTable.
  5. If it is the full refreshmode: 
    • Deduce the scheduling CRONexpression via freshness value, create a scheduling workflow in the scheduler, and then update the refresh job information to CatalogDynamicTable
    • The scheduler periodically triggers the batch scheduling tasks, then call the SQL gateway to execute the workflow.
    • When SQL gateway receives a scheduled task, it retrieves the definition query of dynamic table from Catalog, calculates the corresponding time partition value (e.g., ds) for partitioned table,  then submits a Flink batch job.

Among these, the most critical component is the Dynamic Table Manager, which handles the most essential functions in the execution process of Dynamic Table and interacts with other relevant components. Since the implementation details of Dynamic Table Manager are not the focus of this FLIP, they are not elaborated on here.

Regarding the details of interfacing with various open source schedulers, we will discuss the relevant interfaces involved in a separate flip. For specific schedulers such as airflow and dolphinscheduler will be implemented and maintained in a separate repository (similar to connector repos).

Partitioned Table Full Refresh

Based on the previous introduction, we understand that in full refresh mode, batch scheduling tasks are created based on freshness, and each execution trigger results in a full table refresh. A common scenario in data warehouses is time-partitioned tables, such as daily partitions keyed by 'ds'. If the user expects only to refresh the latest partition with each execution, how can this be handled? We will introduce the implementation details through the following example.

Code Block
languagesql
CREATE DYNAMIC TABLE dt_sink

PARTITIONED BY (ds)

WITH(

  --hint time partition format

  'partition.fields.ds.time-formatter' = 'yyyy-MM-dd'

)

FRESHNESS = INTERVAL '1' DAY

AS SELECT 

  ds

  id,

  order_number,

  user_id,

...

FROM source

In the given example, both the 'source' and 'dt_sink' tables are partitioned by a 'ds' field. By specifying the format of the time partition field in the WITH clause, the framework is prompted to only refresh the data in the latest 'ds' partition with each batch schedule.

Given the scenario where the freshness is set to '1' DAY, meaning that the scheduling occurs once a day. Assuming today's date is March 2, 2024, when the batch scheduling is triggered at midnight, the framework will use the time passed by the scheduling system, combined with freshness and 'partition.fields.ds.time-formatter', to calculate the 'ds' partition that needs to be refreshed. For this case, it will determine that 'ds=2024-03-01' which corresponds to yesterday's partition. The framework then assembles the following SQL Statement to submit a Fink batch job.

Code Block
languagesql
INSERT OVERWRITE dt_sink
SELECT * FROM 
(SELECT 
  	ds
  	id,
	order_number,
	user_id,
	...
  FROM source
) AS tmp
– add partition filter condition
WHERE ds = '2024-03-01'

Since both the 'source' and 'dt_sink' tables are partitioned by the 'ds' field, the optimizer, when generating the execution plan, will push down the 'ds' filter condition to the 'source' table, thereby only reading data from the corresponding partition of the 'source' table.

If FRESHNESS = INTERVAL '1' HOUR, what would the behavior be? First, a task that schedules on an hourly basis is registered on the Scheduler. Then, with each execution, the framework will use the time provided by the scheduling system, combined with freshness and the 'partition.fields.ds.time-formatter', to calculate the 'ds' partition that needs to be refreshed. For this scenario, it would compute that ds='2024-03-02', which is the latest time partition.

Compatibility, Deprecation, and Migration Plan

This FLIP only has incremental changes, and thus is fully backwards compatible.

Test Plan

Both Unit Test & Integration Test & Manual Test will be introduced to verify this change. 

Rejected Alternatives

Materialized View

Why not use Materialized View? A view is an equivalent of a query result. According to the definition of the SQL standard, it is not supposed to support modifications. However, in the lakehouse scenario, there is a need to modify data, which leads to a conceptual conflict between the two. Therefore, using a table is more appropriate.

Base class of CatalogDynamicTable 

Regarding CatalogDynamicTable, another alternative is extending CatalogTable directly instead of CatalogBaseTable. This makes it straightforward to have attributes of CatalogTable.

Code Block
languagejava
@PublicEvolving
public interface CatalogDynamicTable extends CatalogTable {

      /**
     * Creates an instance of {@link CatalogDynamicTable}.
     *
     * @param schema unresolved schema
     * @param comment optional comment
     * @param partitionKeys list of partition keys or an empty list if not partitioned
     * @param options options to configure the connector
     * @param snapshot table snapshot of the table
     * @param definitionQuery definition sql query
     * @param freshness data freshness
     * @param refreshMode data refresh mode
     * @param refreshJobInfo refresh job handler
     */
    static CatalogDynamicTable of(
            Schema schema,
            @Nullable String comment,
            List<String> partitionKeys,
            Map<String, String> options,
            @Nullable Long snapshot,
            String definitionQuery,
            String freshness,
            RefreshMode refreshMode,
            RefreshJobHandler refreshJobHandler) {
        return new DefaultCatalogDynamicTable(
                schema,
                comment,
                partitionKeys,
                options,
                snapshot,
                definitionQuery,
                freshness,
                refreshMode,
                refreshJobHandler);
    }

      /**
     * The definition query text of dynamic table. Text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of the dynamic table which is used to determine the data refresh mode. */
    String getFreshness();

    /** Get the refresh mode of the dynamic table. */
    RefreshMode getRefreshMode();

    /** Get the refresh job handler of the dynamic table. */
    RefreshJobHandler getRefreshJobHandler();

    /** The refresh mode of the dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL,
    }
}


There are two main reasons why CatalogTable was not inherited:

...

This is new feature which doesn't change current behaviors.

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?Both Unit Test & Integration Test & Manual Test will be introduced to verify this change. 

Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables

...