Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the current data warehouse processing architecture, both the Lambda architecture with separate streaming and batch processing systems and the Lakehouse architecture with a unified streaming-batch storage-based processing model exist. However, there are some issues with both architectures:

...

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create dynamic table represents a streaming-batch unified storage,  which can provide full historical data and incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.


@PublicEvolving

public interface Catalog {

        /**

     * Returns a factory for creating instances from catalog objects.

     *

     * <p>This method enables bypassing the discovery process. Implementers can directly pass

     * internal catalog-specific objects to their own factory. For example, a custom {@link

     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.

     *

     * <p>If this catalog support to create dynamic table, you should also override this method to

     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and

     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override

     * this method, you must specify the physical connector identifier that this catalog represents

     * storage when creating a dynamic table. Otherwise, the planner can't find the {@link

     * DynamicTableFactory}.

     *

     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement

     * multiple supported extension points. An {@code instanceof} check is performed by the caller

     * that checks whether a required factory is implemented; otherwise the discovery process is

     * used.

     */

    default Optional<Factory> getFactory() {

        return Optional.empty();

    }

}

RefreshJobHandler

We introduce the class RefreshJobHandler that records the meta information of the current dynamic table background refresh job.

/**

 * This interface represents the meta information of the current dynamic table background refresh job.

 * The refresh mode of the background job may be continuous or full. The format of the meta information

 * in the two modes is not consistent, so we unify it by a structured json string jobDetail.

 *

 * <p>In continuous mode, the format of the meta information is { "clusterType": "yarn",

 * "clusterId": "xxx", "jobId": "yyyy" }.

 *

 * <p>In full mode, the meta information format is { "schedulerType": "airflow", "endpoint": "xxx",

 * "workflowId": "yyy" }.

 */

@PublicEvolving

public class RefreshJobHandler {

    private final CatalogDynamicTable.RefreshMode actualRefreshMode;

    private final JobState jobState;

    private final @Nullable String jobDetail;

    public RefreshJobHandler(

            CatalogDynamicTable.RefreshMode actualRefreshMode,

            JobState jobState,

            @Nullable String jobDetail) {

        this.actualRefreshMode = actualRefreshMode;

        this.jobState = jobState;

        this.jobDetail = jobDetail;

    }

    public CatalogDynamicTable.RefreshMode getActualRefreshMode() {

        return actualRefreshMode;

    }

    public JobState getJobState() {

        return jobState;

    }

    public String getJobDetail() {

        return jobDetail;

    }

    @Override

    public boolean equals(Object o) {

        if (this == o) {

            return true;

        }

        if (o == null || getClass() != o.getClass()) {

            return false;

        }

        RefreshJobHandler that = (RefreshJobHandler) o;

        return actualRefreshMode == that.actualRefreshMode

                && jobState == that.jobState

                && Objects.equals(jobDetail, that.jobDetail);

    }

    @Override

    public int hashCode() {

        return Objects.hash(actualRefreshMode, jobState, jobDetail);

    }

    @Override

    public String toString() {

        return "RefreshJobHandler{"

                + "actualRefreshMode="

                + actualRefreshMode

                + ", jobState="

                + jobState

                + ", jobDetail='"

                + jobDetail

                + '\''

                + '}';

    }

    /** Background refresh job state. */

    @PublicEvolving

    public enum JobState {

        INITIALIZING,

        RUNNING,

        SUSPENDED

    }

}

ResolvedCatalogDynamicTable

We introduce the public class ResolvedCatalogDynamicTable that represents a validated CatalogDynamicTable.

/**

 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the

 * {@link Catalog} but resolved by the framework.

 *

 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner

 * optimize the query phase.

 */

@PublicEvolving

public class ResolvedCatalogDynamicTable

        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {

        this.origin =

                Preconditions.checkNotNull(

                        origin, "Original catalog dynamic table must not be null.");

        this.resolvedSchema =

                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");

    }

    @Override

    public Map<String, String> getOptions() {

        return origin.getOptions();

    }

    @Override

    public String getComment() {

        return origin.getComment();

    }

    @Override

    public CatalogBaseTable copy() {

        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);

    }

    @Override

    public Optional<String> getDescription() {

        return origin.getDescription();

    }

    @Override

    public Optional<String> getDetailedDescription() {

        return origin.getDetailedDescription();

    }

    @Override

    public boolean isPartitioned() {

        return origin.isPartitioned();

    }

    @Override

    public List<String> getPartitionKeys() {

        return origin.getPartitionKeys();

    }

    @Override

    public ResolvedCatalogDynamicTable copy(Map<String, String> options) {

        return new ResolvedCatalogDynamicTable(origin.copy(options), resolvedSchema);

    }

    @Override

    public CatalogDynamicTable getOrigin() {

        return origin;

    }

    @Override

    public ResolvedSchema getResolvedSchema() {

        return resolvedSchema;

    }

    @Override

    public String getDefinitionQuery() {

        return origin.getDefinitionQuery();

    }

    @Override

    public String getFreshness() {

        return origin.getFreshness();

    }

    @Override

    public RefreshMode getRefreshMode() {

        return origin.getRefreshMode();

    }

    @Override

    public RefreshJobHandler getRefreshJobHandler() {

        return origin.getRefreshJobHandler();

    }

}


Configuration

We need to introduce two options to control the background refresh job.

Key

Required

Default

Type

Description

dynamic.table.refresh-mode.freshness-threshold

Optional

30 minute

Duration

This is used to derive the refresh mode for the background refresh job that Dynamic Table statement generates. If FRESHNESS is less than the threshold, it is running in continuous mode. If it is greater, running in full mode.

partition.fields.#.date-formatter

Optional

None

String

Specify the time partition formatter of partitioned dynamic table, # represents the string type partition field. This is used to hint to the framework which partition to refresh in full refresh mode.


In the Overall Architecture section, we will introduce the SQL gateway that is responsible for managing the background refresh job of the dynamic table, so this option is the only scope to SQL gateway. It is read from the flink-conf each time the service starts, and is not allowed to be modified. After the service starts, it first looks up all the Dynamic Tables, compares dynamic.table.refresh-mode.freshness-threshold value to each Dynamic Table freshness, and if the refresh mode changes, the original refresh job is stopped, then a new job is created, and the job information is updated to Dynamic Table.

Modifying this option may affect the refresh mode of the background refresh jobs of all registered dynamic tables in the Catalog, so please modify it carefully. If you really need to modify the job refresh mode, please prioritize to realize it by modifying dynamic table freshness.


Gateway Rest API

v3/dynamic-tables/refresh

Verb: POST

Response code: 200 OK

Trigger a refresh operation of dynamic tables.

Request

body

{

  "tables": [],

  "scheduleTime": "",

  "configuration" {}

}

Response

body

{

  jobId: "",

  clusterInfo: {}

}

This API is used for workflow scheduler to trigger a refresh operation of dynamic table aumatically.

...

Integrating the aforementioned context, Dynamic Table is a new table type introduced for Flink SQL that enables users to build and manage data pipelines, it is automatically refreshed at specified freshness intervals. This means that the dynamic table needs to be bound to a transformation SQL statement, and based on the defined data freshness requirements, a background data refresh job will be created to automatically update the data.

Image RemovedImage Added

So dynamic table is composed of three parts:

...

  • SQL CLI: A user interaction interface responsible for submitting dynamic table statements. 
  • SQL Gateway: In charge of parsing dynamic table statements, creating tables, submitting jobs to flink cluster. Additionally, the workflow scheduler will interact with SQL Gateway to submit batch workflows.
  • Dynamic Table Manager: As an embedded manager in SQL Gateway that interacts with Workflow Scheduler through Restful APIs. It is responsible for deducing the data refresh mode for dynamic table, deriving streaming job runtime parameters, CRON expressions for batch scheduling jobs, creating batch-scheduled jobs to scheduler, and more. 
  • Catalog: Storage for dynamic table metadata, which includes schema, options, freshness, definition query, refresh job information, etc. 
  • Pluggable Workflow Scheduler: Interacts bidirectionally with SQL Gateway through Restful APIs, responsible for creating batch-scheduled jobs and triggering their execution. It supports open-source workflow schedulers like Airflow, DolphinScheduler through a pluggable mechanism. 
  • Flink Cluster: Runs Flink Jobs, including both streaming and batch jobs.

Image RemovedImage Added

As illustrated in the architecture diagram above, the workflow for executing a Dynamic Table create statement is as follows:

...

Regarding CatalogDynamicTable, another alternative is extending CatalogTable directly instead of CatalogBaseTable. This makes it straightforward to have attributes of CatalogTable.

@PublicEvolving

public interface CatalogDynamicTable extends CatalogTable {

      /**

     * Creates an instance of {@link CatalogDynamicTable}.

     *

     * @param schema unresolved schema

     * @param comment optional comment

     * @param partitionKeys list of partition keys or an empty list if not partitioned

     * @param options options to configure the connector

     * @param snapshot table snapshot of the table

     * @param definitionQuery definition sql query

     * @param freshness data freshness

     * @param refreshMode data refresh mode

     * @param refreshJobInfo refresh job handler

     */

    static CatalogDynamicTable of(

            Schema schema,

            @Nullable String comment,

            List<String> partitionKeys,

            Map<String, String> options,

            @Nullable Long snapshot,

            String definitionQuery,

            String freshness,

            RefreshMode refreshMode,

            RefreshJobHandler refreshJobHandler) {

        return new DefaultCatalogDynamicTable(

                schema,

                comment,

                partitionKeys,

                options,

                snapshot,

                definitionQuery,

                freshness,

                refreshMode,

                refreshJobHandler);

    }

      /**

     * The definition query text of dynamic table. Text is expanded in contrast to the original SQL.

     * This is needed because the context such as current DB is lost after the session, in which

     * view is defined, is gone. Expanded query text takes care of this, as an example.

     *

     * <p>For example, for a dynamic table that is defined in the context of "default" database with

     * a query {@code select * from test1}, the expanded query text might become {@code select

     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in

     * database "default" and has two columns ("name" and "value").

     *

     * @return the dynamic table definition in expanded text.

     */

    String getDefinitionQuery();

    /** Get the freshness of the dynamic table which is used to determine the data refresh mode. */

    String getFreshness();

    /** Get the refresh mode of the dynamic table. */

    RefreshMode getRefreshMode();

    /** Get the refresh job handler of the dynamic table. */

    RefreshJobHandler getRefreshJobHandler();

    /** The refresh mode of the dynamic table. */

    @PublicEvolving

    enum RefreshMode {

        /**

         * This mode is derived by freshness when a dynamic table is created and not manually specified.

         */

        CONTINUOUS,

        FULL,

    }

}

There are two main reasons why CatalogTable was not inherited:

...