You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the current data warehouse processing architecture, both the Lambda architecture with separate streaming and batch processing systems and the Lakehouse architecture with a unified streaming-batch storage-based processing model exist. However, there are some issues with both architectures:

  • Currently, Flink SQL provides two execution modes: streaming and batch. Users must choose the appropriate execution mode based on data freshness requirements. In streaming execution mode, the INSERT INTO statement is used, and in batch execution mode, the INSERT OVERWRITE statement is utilized. There's also heterogeneity in partition conditions for partitioned tables, leading to the maintenance of two sets of code.
  • When integrating with modern streaming-batch unified storage and using incremental computation model, if there's a need to bulk refresh certain historical partitions, streaming job code logic cannot be directly reused. It necessitates the redevelopment of batch job code with specified partition conditions to avoid a full table refresh, which can result in data duplication.
  • Manual creation and deployment of jobs are required. For streaming jobs, users need to fine-tune parameters such as checkpoint intervals, minibatch, etc., which can be costly in terms of optimization effort. For batch jobs, workflow schedulers like Airflow are needed to create jobs. These are two disjointed systems that cannot seamlessly switch when modifying data freshness requirements.

In response to the aforementioned issues, Flink SQL has long introduced the concept of Dynamic Tables[1] in an attempt to solve these issues, but they have not yet been resolved. At the same time, the industry is also trying to address these challenges. The cloud data warehouse giant Snowflake has unveiled its groundbreaking feature-Dynamic Tables[2] to simplify the creation and management of data pipelines, unify batch and streaming, looking forward to realizing lower data latency, more real-time capabilities. Databricks also has launched a declarative ETL framework Delta Live Tables[3], with a unified API, unifying the streaming and batch data processing pipeline, accelerating ETL development. This also indicates that simplifying data processing pipelines and unifying streaming and batch are current technological trends in the data warehouse field.

Hence, starting from the concept of unified streaming and batch processing (One SQL & Unified Table), it's time to turn Dynamic Table from an abstract concept into a tangible entity. We propose the Dynamic Table entity to simplify streaming and batch ETL pipeline cost-effectively. This allows users to no longer be confused by various complex concepts such as streaming and batch processing, as well as underlying technical details, and to focus solely on business logic and data freshness.

User Story

Let's walk through a user story to see how Dynamic Table solves the problem described earlier.

Business Background

Suppose the MySQL instance has three business tables orders (orders table), orders_pay (orders payment table), and products (product category dim table), which are first ingested in real time to a lake storage such as Paimon via Flink CDC. We are going to perform data widening operations on these three tables, let's see users how to create a data processing pipeline, and pause and resume the data pipeline through Dynamic Table.

Create Data Pipeline

 orders and order_payments are two base tables at ODS, partitioned by ds, here we create a DWD wide table dwd_orders.

CREATE DYNAMIC TABLE dwd_orders
(
  PRIMARY KEY(ds, id) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '3' MINUTE
AS SELECT 
  o.ds
  o.id,
  o.order_number,
  o.user_id,
...
FROM 
  orders as o
  LEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prod
	  ON o.product_id = prod.id
  LEFT JOIN order_pay AS pay
  	ON o.id = pay.order_id and o.ds = pay.ds

When a Create Dynamic Table statement is executed, it deduces the Schema of the table based on the query statement referenced, first creates a Dynamic Table in Catalog, and then automatically creates a data processing pipeline to refresh the table based on the specified Freshness value. Since FRESHNESS is set to 3 minutes here, a Flink Streaming Job is pulled up to refresh continuously.

Pause & Resume Data Pipeline

-- 1. Pause table data refresh
ALTER DYNAMIC TABLE dwd_orders SUSPEND;

-- 2. Resume table data refresh
ALTER DYNAMIC TABLE dwd_orders RESUME
-- Set table option via WITH clause
WITH(
  'sink.parallesim' = '10'
);


  1. Through the SUSPEND statement, the refresh job can be paused.
  2. Through the RESUME statement, the refresh job can be resumed. 

Manual Refresh

-- Refresh historical partition
ALTER DYNAMIC TABLE dwd_orders REFRESH PARTITION(ds='20231023')


If you need to backfill some historical partition for a partitioned table, you just need to execute a REFRESH statement and specify the partition condition, it will launch a Flink batch job to refresh the table.

Typically, in order to comply with GDPR requirements, it may be necessary to modify certain data within a table, and this can be accomplished using the Update statement.

UPDATE dwd_orders SET order_number = 10 WHERE id = 73635185;

Modify Data Freshness

ALTER DYNAMIC TABLE dwd_orders SET FRESHNESS = INTERVAL '1' DAY

If you want to adjust the data freshness, you just need to modify this dynamic table freshness attribute. It will stop the Flink streaming job first, then create a scheduled task automatically in the workflow scheduler. The workflow will trigger the table refresh by the specified freshness.

From the above operational process, it can be seen that the use of Dynamic Table can greatly simplify the development of the data processing pipeline. By using a unified  declarative SQL API to unify the stream and batch pipelines, users do not need to be concerned with Flink job and execution mode, they simply need to operate with the dynamic table.

In summary, we hope that through the Dynamic Table with declared SQL statement and freshness,  you can define the batch and streaming transformation on your data in the same way,  accelerate ETL pipeline development, automatically manage task orchestration, enhance data quality and freshness, save your time and effort.

Public Interfaces

SQL Statements

To support dynamic tables, we need to introduce some new syntax.

Create

CREATE DYNAMIC TABLE [catalog_name.][db_name.]table_name
(
   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
)
[COMMENT table_comment]

[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]

[WITH (key1=val1, key2=val2, ...)]

FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY | MONTH | YEAR }

[REFRESH_MODE = { CONTINUOUS | FULL }]

AS <select_statement>

Parameters

  • PRIMARY KEY: An optional list of columns uniquely identifies each row within the table. The column as primary key must be non-null.
  • PARTITIONED BY: An optional list of columns of the table to partition the table by. 
  • WITH: Optionally sets one or more table properties.
  • FRESHNESS: Define the time measure that the dynamic table’s content should lag behind updates to the base tables. This is merely a best-effort behavior, not a guarantee. 
  • REFRESH_MODE: An Optional clause specifies the refresh type for the dynamic table. This property can not be altered after the dynamic table is created.
  • AS select_statement: This clause populates the dynamic table using the data from the select query. The base tables can be a dynamic table, table or view.

Limitations:

  • Does not support explicitly specifying columns.
  • Does not support modifying the select statement after the dynamic table is created. 
  • Does not support referencing temporary tables, views, or functions in Select statements currently.

FRESHNESS Clause

For a dynamic table, a very important attribute is freshness, so we must highlight its function first. Freshness serves two key functions. 

Firstly, freshness is used for determining the data refresh mode of a dynamic table which can satisfy the specified data freshness, there are two types of refresh modes currently:

  • Continuous Refresh Mode: Launches a Flink streaming job that continuously refreshes the dynamic table data.
  • Full Refresh Mode: A Workflow Scheduler periodically triggers a Flink batch job to refresh the dynamic table data.

Regarding how to determine the refresh mode, you can see the Configuration section for more detail.

Secondly, after determining the refresh mode, it will be further used to determine the parameters of the refresh job:

  • In continuous mode, it will be used to derive checkpoint & minibatch related options to determine the visibility of data to satisfy the defined freshness. 
  • In full mode, it will be used to derive the scheduling interval of the workflow, i.e., the Cron expression, and calculate the corresponding time partition value (e.g., ds) for partitioned table, to satisfy the defined data freshness of dynamic tabe.

REFRESH_MODE Clause

In the previous section we explained that Refresh Mode is automatically derived by the engine based on freshness, so why do we provide the REFRESH_MODE clause to support specifying Refresh Mode manually? What is the relationship between these two? Considering different business scenarios, in some scenarios the user knows exactly what refresh mode is needed, and the refresh mode derived from freshness can't meet the demand, so we provide a more flexible way to allow manual specification of refresh mode.

If the REFRESH_MODE is specified at the time of table creation, its priority is higher than freshness, which means freshness will no longer be used to infer the refresh mode of the dynamic table.


Operations on dynamic tables are executed through the following Alter statements.

Suspend

ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name SUSPEND

Stop or suspend refreshes on the dynamic table, also means that the background refresh job is suspended. If the dynamic table is used by other downstream dynamic tables, the downstream will not be suspended, effective only for the current dynamic table.

Resume

ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name RESUME

[WITH('key1' = 'val1', 'key2' = 'val2')]


Restart or resume refresh on the dynamic table, also means that the background refresh job is resumed. Resume operations only work for current dynamic table, and don't cascade resumes downstream dynamic tables are suspended.

You can also specify one or more dynamic table options by WITH keyword to optimize the background refresh job, these options are only effective for the current operation and will not be persisted. 

Refresh

ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name REFRESH

[PARTITION (key1=val1, key2=val2, ...)]

Manually trigger a dynamic table refresh, only works for the current table, doesn't trigger a cascade refresh of all downstream tables defaultly.

Note: If the refresh mode is continuous and the background job is running, you should be careful with the refresh command, which can lead to inconsistent data.

Set

Freshness

ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name 

  SET FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY | MONTH | YEAR}

Modify the time measure that the dynamic table’s content should lag behind updates to the base tables. Modifying freshness may affect the refresh mode of the background job, which may change from continuous mode to full mode, or from full mode to continuous mode, or remain unchanged. In any case, alter freshness will stop the current refresh job and create a new job with the updated freshness.

Note: If the refresh mode is manually specified when creating a dynamic table, modify freshness will not modify the refresh mode.

REFRESH_MODE

ALTER DYNAMIC TABLE  [catalog_name.][db_name.]table_name 

  SET REFRESH_MODE = { FULL | CONTINUOUS }

Modify the refresh mode of the dynamic table, which may change from continuous mode to full mode, or full mode to continuous mode.

Options

ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name  SET ('key' = 'val');

Reset

ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name RESET ('key');

Besides the aforementioned Alter syntax, dynamic tables currently do not support other Alter syntaxes that are applicable to regular Table.

Drop

DROP DYNAMIC TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name

Drop dynamic table will delete the refresh job first, then drop the metadata and data.

Describe Job

{ DESCRIBE | DESC } JOB ${jobId} 

Describe the specified job information in the Flink cluster, the return information schema same as SHOW JOBS statement.

Interface Changes

CatalogDynamicTable

We introduce a new interface CatalogDynamicTable which extends the CatalogBaseTable. As described above, the CatalogDynamicTable, in addition to having the attributes of schema, options and partition keys , also encompasses three important metadata: definition query, freshness and refresh job.

/**
 * Represents the unresolved metadata of a dynamic table in a {@link Catalog}.
 *
 * <p>Dynamic Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogDynamicTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background data refresh job, either through a flink streaming or scheduled batch job, it is
 *       initialized after dynamic table is created.
 * </ul>
 *
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, RefreshMode, RefreshJobHandler)} for a basic implementation of this interface or create a
 * custom class that allows passing catalog-specific objects all the way down to the connector
 * creation (if necessary).
 */
@PublicEvolving
public interface CatalogDynamicTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.DYNAMIC_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogDynamicTable copy(Map<String, String> options);

    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();
    }

    /**
     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of dynamic table which is used to determine the data refresh mode. */
    Duration getFreshness();

    /** Get the refresh mode of dynamic table. */
    RefreshMode getRefreshMode();

    /** Get the refresh handler of dynamic table. */
    RefreshHandler getRefreshJobHandler();

    /** The refresh mode of dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }
}


Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create dynamic table represents a  physical storage,  which can provide full historical data or incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

Catalog
@PublicEvolving
public interface Catalog {
    
    /**
     * Returns a factory for creating instances from catalog objects.
     *
     * <p>This method enables bypassing the discovery process. Implementers can directly pass
     * internal catalog-specific objects to their own factory. For example, a custom {@link
     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
     *
     * <p>If this catalog support to create dynamic table, you should also override this method to
     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and
     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override
     * this method, you must specify the physical connector identifier that this catalog represents
     * storage when create dynamic table. Otherwise, the planner can't find the {@link
     * DynamicTableFactory}.
     *
     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement
     * multiple supported extension points. An {@code instanceof} check is performed by the caller
     * that checks whether a required factory is implemented; otherwise the discovery process is
     * used.
     */
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }
}


RefreshHandler

We introduce the class RefreshHandler that records the meta information of the current dynamic table background refresh job.

/**
 * This interface represents the meta information of current dynamic table background refresh job.
 * The refresh mode of background job maybe continuous or full. The format of the meta information
 * in the two modes is not consistent, so we unify it by a structured json string jobDetail.
 *
 * <p>In continuous mode, the format of the meta information is { "clusterType": "yarn",
 * "clusterId": "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information format is { "schedulerType": "airflow", "endpoint": "xxx",
 * "workflowId": "yyy" }.
 */
@PublicEvolving
public class RefreshHandler {

    private final CatalogDynamicTable.RefreshMode actualRefreshMode;
    private final State state;
    private final @Nullable String refreshDetail;

    public RefreshJobHandler(
            CatalogDynamicTable.RefreshMode actualRefreshMode,
            JobState jobState,
            @Nullable String jobDetail) {
        this.actualRefreshMode = actualRefreshMode;
        this.state = state;
        this.refreshDetail = refreshDetail;
    }

    public CatalogDynamicTable.RefreshMode getActualRefreshMode() {
        return actualRefreshMode;
    }

    public State getState() {
        return jobState;
    }

    public String getRefreshDetail() {
        return refreshDetail;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        RefreshJobHandler that = (RefreshJobHandler) o;
        return actualRefreshMode == that.actualRefreshMode
                && state == that.state
                && Objects.equals(refreshDetail, that.refreshDetail);
    }

    @Override
    public int hashCode() {
        return Objects.hash(actualRefreshMode, state, refreshDetail);
    }

    @Override
    public String toString() {
        return "RefreshJobHandler{"
                + "actualRefreshMode="
                + actualRefreshMode
                + ", state="
                + state
                + ", refreshDetail='"
                + refreshDetail
                + '\''
                + '}';
    }

    /** Background refresh job state. */
    @PublicEvolving
    public enum State {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}


ResolvedCatalogDynamicTable

We introduce the public class ResolvedCatalogDynamicTable that represents a validated CatalogDynamicTable.

/**
 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogDynamicTable
        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog dynamic table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptions() {
        return origin.getOptions();
    }

    @Override
    public String getComment() {
        return origin.getComment();
    }

    @Override
    public CatalogBaseTable copy() {
        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);
    }

    @Override
    public Optional<String> getDescription() {
        return origin.getDescription();
    }

    @Override
    public Optional<String> getDetailedDescription() {
        return origin.getDetailedDescription();
    }

    @Override
    public boolean isPartitioned() {
        return origin.isPartitioned();
    }

    @Override
    public List<String> getPartitionKeys() {
        return origin.getPartitionKeys();
    }

    @Override
    public ResolvedCatalogDynamicTable copy(Map<String, String> options) {
        return new ResolvedCatalogDynamicTable(origin.copy(options), resolvedSchema);
    }

    @Override
    public CatalogDynamicTable getOrigin() {
        return origin;
    }

    @Override
    public ResolvedSchema getResolvedSchema() {
        return resolvedSchema;
    }

    @Override
    public String getDefinitionQuery() {
        return origin.getDefinitionQuery();
    }

    @Override
    public Duration getFreshness() {
        return origin.getFreshness();
    }

    @Override
    public RefreshMode getRefreshMode() {
        return origin.getRefreshMode();
    }

    @Override
    public RefreshHandler getRefreshHandler() {
        return origin.getRefreshHandler();
    }
}


Configuration

We need to introduce two options to control the background refresh job.

Key

Required

Default

Type

Description

dynamic.table.refresh-mode.freshness-threshold

Optional

30 minute

Duration

This is used to derive the refresh mode for the background refresh job that Dynamic Table statement generates. If FRESHNESS is less than the threshold, it is running in continuous mode. If it is greater, running in full mode.

partition.fields.#.date-formatter

Optional

None

String

Specify the time partition formatter of partitioned dynamic table, # represents the string type partition field. This is used to hint to the framework which partition to refresh in full refresh mode.


In the Overall Architecture section, we will introduce the SQL gateway that is responsible for managing the background refresh job of the dynamic table, so this option is the only scope to SQL gateway. It is read from the flink-conf each time the service starts, and is not allowed to be modified. After the service starts, it first looks up all the Dynamic Tables, compares dynamic.table.refresh-mode.freshness-threshold value to each Dynamic Table freshness, and if the refresh mode changes, the original refresh job is stopped, then a new job is created, and the job information is updated to Dynamic Table.

Modifying this option may affect the refresh mode of the background refresh jobs of all registered dynamic tables in the Catalog, so please modify it carefully. If you really need to modify the job refresh mode, please prioritize to realize it by modifying dynamic table freshness.


Gateway Rest API

v3/dynamic-tables/refresh

Verb: POST

Response code: 200 OK

Trigger a refresh operation of dynamic tables.

Request

body

{

  "tables": [],

  "scheduleTime": "",

  "configuration" {}

}

Response

body

{

  jobId: "",

  clusterInfo: {}

}

This API is used for workflow scheduler to trigger a refresh operation of dynamic table aumatically.

Proposed Design

Integrating the aforementioned context, Dynamic Table is a new table type introduced for Flink SQL that enables users to build and manage data pipelines, it is automatically refreshed at specified freshness intervals. This means that the dynamic table needs to be bound to a transformation SQL statement, and based on the defined data freshness requirements, a background data refresh job will be created to automatically update the data.

So dynamic table is composed of three parts:

  1. Metadata: This includes the table's schema, options, definition query, freshness and refresh job information.
  2. Data: This corresponds to a specific storage that holds the current table's data, which can provide full historical data or incremental changelog. 
  3. Background Refresh Job: This job ensures the production and updating of data in either a continuous or full refresh way.

Overall Architecture

Taking into account the functional design of Dynamic Table, the management of its entire lifecycle is primarily supported by the following core components in the architecture:

  • SQL CLI: A user interaction interface responsible for submitting dynamic table statements. 
  • SQL Gateway: In charge of parsing dynamic table statements, creating tables, submitting jobs to flink cluster. Additionally, the workflow scheduler will interact with SQL Gateway to submit batch workflows.
  • Dynamic Table Manager: As an embedded manager in SQL Gateway that interacts with Workflow Scheduler through Restful APIs. It is responsible for deducing the data refresh mode for dynamic table, deriving streaming job runtime parameters, CRON expressions for batch scheduling jobs, creating batch-scheduled jobs to scheduler, and more. 
  • Catalog: Storage for dynamic table metadata, which includes schema, options, freshness, definition query, refresh job information, etc. 
  • Pluggable Workflow Scheduler: Interacts bidirectionally with SQL Gateway through Restful APIs, responsible for creating batch-scheduled jobs and triggering their execution. It supports open-source workflow schedulers like Airflow, DolphinScheduler through a pluggable mechanism. 
  • Flink Cluster: Runs Flink Jobs, including both streaming and batch jobs.

As illustrated in the architecture diagram above, the workflow for executing a Dynamic Table create statement is as follows:

  1. Parse the create statement, deduce the schema of the dynamic table based on the select query, and generate a CatalogDynamicTable instance, including schema, table options(if specified), definition query and freshness.
  2. Create the dynamic table in Catalog.
  3. Determine the refresh mode based on defined freshness and dynamic.table.refresh-mode.freshness-threshold value, generating the corresponding type background refresh job.
  4. If it is the continuous refresh mode:
    • Compile and submit a Flink streaming job, which starts to run statelessly, the source table data read position defaults to the entire dataset. Then the refresh job information is updated to CatalogDynamicTable.
  5. If it is the full refresh mode: 
    • Deduce the scheduling CRON expression via freshness value, create a scheduling workflow in the scheduler, and then update the refresh job information to CatalogDynamicTable
    • The scheduler periodically triggers the batch scheduling tasks, then call the SQL gateway to execute the workflow.
    • When SQL gateway receives a scheduled task, it retrieves the definition query of dynamic table from Catalog, calculates the corresponding time partition value (e.g., ds) for partitioned table,  then submits a Flink batch job.

Among these, the most critical component is the Dynamic Table Manager, which handles the most essential functions in the execution process of Dynamic Table and interacts with other relevant components. Since the implementation details of Dynamic Table Manager are not the focus of this FLIP, they are not elaborated on here.

Regarding the details of interfacing with various open source schedulers, we will discuss the relevant interfaces involved in a separate flip. For specific schedulers such as airflow and dolphinscheduler will be implemented and maintained in a separate repository (similar to connector repos).

Partitioned Table Full Refresh

Based on the previous introduction, we understand that in full refresh mode, batch scheduling tasks are created based on freshness, and each execution trigger results in a full table refresh. A common scenario in data warehouses is time-partitioned tables, such as daily partitions keyed by 'ds'. If the user expects only to refresh the latest partition with each execution, how can this be handled? We will introduce the implementation details through the following example.

CREATE DYNAMIC TABLE dt_sink
PARTITIONED BY (ds)
WITH(
  --hint time partition format
  'partition.fields.ds.time-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' DAY
AS SELECT 
  	ds
  	id,
  	order_number,
  	user_id,
	...
FROM source

In the given example, both the 'source' and 'dt_sink' tables are partitioned by a 'ds' field. By specifying the format of the time partition field in the WITH clause, the framework is prompted to only refresh the data in the latest 'ds' partition with each batch schedule.

Given the scenario where the freshness is set to '1' DAY, meaning that the scheduling occurs once a day. Assuming today's date is March 2, 2024, when the batch scheduling is triggered at midnight, the framework will use the time passed by the scheduling system, combined with freshness and 'partition.fields.ds.time-formatter', to calculate the 'ds' partition that needs to be refreshed. For this case, it will determine that 'ds=2024-03-01' which corresponds to yesterday's partition. The framework then assembles the following SQL Statement to submit a Fink batch job.

INSERT OVERWRITE dt_sink
SELECT * FROM 
(SELECT 
  	ds
  	id,
	order_number,
	user_id,
	...
  FROM source
) AS tmp
– add partition filter condition
WHERE ds = '2024-03-01'

Since both the 'source' and 'dt_sink' tables are partitioned by the 'ds' field, the optimizer, when generating the execution plan, will push down the 'ds' filter condition to the 'source' table, thereby only reading data from the corresponding partition of the 'source' table.

If FRESHNESS = INTERVAL '1' HOUR, what would the behavior be? First, a task that schedules on an hourly basis is registered on the Scheduler. Then, with each execution, the framework will use the time provided by the scheduling system, combined with freshness and the 'partition.fields.ds.time-formatter', to calculate the 'ds' partition that needs to be refreshed. For this scenario, it would compute that ds='2024-03-02', which is the latest time partition.

Compatibility, Deprecation, and Migration Plan

This FLIP only has incremental changes, and thus is fully backwards compatible.

Test Plan

Both Unit Test & Integration Test & Manual Test will be introduced to verify this change. 

Rejected Alternatives

Materialized View

Why not use Materialized View? A view is an equivalent of a query result. According to the definition of the SQL standard, it is not supposed to support modifications. However, in the lakehouse scenario, there is a need to modify data, which leads to a conceptual conflict between the two. Therefore, using a table is more appropriate.

Base class of CatalogDynamicTable 

Regarding CatalogDynamicTable, another alternative is extending CatalogTable directly instead of CatalogBaseTable. This makes it straightforward to have attributes of CatalogTable.

@PublicEvolving
public interface CatalogDynamicTable extends CatalogTable {

      /**
     * Creates an instance of {@link CatalogDynamicTable}.
     *
     * @param schema unresolved schema
     * @param comment optional comment
     * @param partitionKeys list of partition keys or an empty list if not partitioned
     * @param options options to configure the connector
     * @param snapshot table snapshot of the table
     * @param definitionQuery definition sql query
     * @param freshness data freshness
     * @param refreshMode data refresh mode
     * @param refreshJobInfo refresh job handler
     */
    static CatalogDynamicTable of(
            Schema schema,
            @Nullable String comment,
            List<String> partitionKeys,
            Map<String, String> options,
            @Nullable Long snapshot,
            String definitionQuery,
            String freshness,
            RefreshMode refreshMode,
            RefreshJobHandler refreshJobHandler) {
        return new DefaultCatalogDynamicTable(
                schema,
                comment,
                partitionKeys,
                options,
                snapshot,
                definitionQuery,
                freshness,
                refreshMode,
                refreshJobHandler);
    }

      /**
     * The definition query text of dynamic table. Text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of the dynamic table which is used to determine the data refresh mode. */
    String getFreshness();

    /** Get the refresh mode of the dynamic table. */
    RefreshMode getRefreshMode();

    /** Get the refresh handler of the dynamic table. */
    RefreshJobHandler getRefreshHandler();

    /** The refresh mode of the dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL,
    }
}


There are two main reasons why CatalogTable was not inherited:

  1. Dynamic Table is a new conceptual entity and table type that also includes attributes such as query, freshness and refresh job, which makes it more appropriate to use the new interface.
  2. If CatalogTable adds a new attribute that is not supported by Dynamic Table, it will lead to a conflict, so it is more appropriate to represent it using an interface that is parallel to CatalogTable.

Compatibility, Deprecation, and Migration Plan

This is new feature which doesn't change current behaviors.

Test Plan

Both Unit Test & Integration Test & Manual Test will be introduced to verify this change. 

Reference

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables

[2] https://docs.snowflake.com/en/user-guide/dynamic-tables-about

[3] https://docs.databricks.com/en/delta-live-tables/index.html

[4] https://hudi.apache.org/docs/overview

[5] https://paimon.apache.org/docs/master

[6] PoC: https://github.com/lsyldliu/flink/tree/dynamic_table_poc




  • No labels