Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagesql
CREATE DYNAMIC TABLE dwd_orders

PRIMARY KEY(ds, id)

PARTITIONED BY (ds)

FRESHNESS = INTERVAL '3' MINUTE

AS SELECT 

  o.ds

  o.id,

  o.order_number,

  o.user_id,

...

FROM 

  orders as o

  LEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prod

	  ON o.product_id = prod.id

  LEFT JOIN order_pay AS pay

  ON  	ON o.id = pay.order_id and o.ds = pay.ds

...

Pause & Resume Data Pipeline

Code Block
languagesql
-- 1. Pause table data refresh

...


ALTER TABLE dwd_orders SUSPEND;

...



-- 2. Resume table data refresh

...


ALTER TABLE dwd_orders RESUME

...



-- Set table option via WITH clause

...


WITH(

...


  'sink.parallesim' = '10'

...


);


  1. Through the SUSPEND statement, the refresh job can be paused.
  2. Through the RESUME statement, the refresh job can be resumed. 

Manual Refresh

Code Block
languagesql
-- Refresh historical partition

...


ALTER TABLE dwd_orders REFRESH PARTITION(ds='20231023')


If you need to backfill some historical partition for a partitioned table, you just need to execute a REFRESH statement and specify the partition condition, it will launch a Flink batch job to refresh the table.

Typically, in order to comply with GDPR requirements, it may be necessary to modify certain data within a table, and this can be accomplished using the Update statement.

...

Code Block
languagesql
UPDATE dwd

...

_orders SET order_number = 10 WHERE id = 73635185;

Modify Data Freshness

Code Block
languagesql
ALTER TABLE dwd_orders SET FRESHNESS = INTERVAL '1' DAY

If you want to adjust the data freshness, you just need to modify this dynamic table freshness attribute. It will stop the Flink streaming job first, then create a scheduled task automatically in the workflow scheduler. The workflow will trigger the table refresh by the specified freshness.

...

To support dynamic tables, we need to introduce some new syntax.

Create

Code Block
languagesql
CREATE DYNAMIC TABLE [catalog_name.][db_name.]table_name

...



[COMMENT table_comment]

...



[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

...



[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]

...



[WITH (key1=val1, key2=val2, ...)]

...



FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY | MONTH | YEAR }

...



[REFRESH_MODE = { CONTINUOUS | FULL }]

...



AS <select_statement>

Parameters

  • PRIMARY KEY: An optional list of columns uniquely identifies each row within the table. The column as primary key must be non-null.
  • PARTITIONED BY: An optional list of columns of the table to partition the table by. 
  • WITH: Optionally sets one or more table properties.
  • FRESHNESS: Define the time measure that the dynamic table’s content should lag behind updates to the base tables. This is merely a best-effort behavior, not a guarantee. 
  • REFRESH_MODE:An Optional clause specifies the refresh type for the dynamic table. This property can not be altered after the dynamic table is created.
  • AS select_statement: This clause populates the dynamic table using the data from the select query. The base tables can be a dynamic table, table or view.

...

If the REFRESH_MODE is specified at the time of table creation, its priority is higher than freshness, which means freshness will no longer be used to infer the refresh mode of the dynamic table.

Suspend

Code Block
languagesql
ALTER TABLE [catalog_name.][db_name.]table_name SUSPEND

Stop or suspend refreshes on the dynamic table, also means that the background refresh job is suspended. If the dynamic table is used by other downstream dynamic tables, the downstream will not be suspended, effective only for the current dynamic table.

Resume

Code Block
languagesql
ALTER TABLE [catalog_name.][db_name.]table_name RESUME

...



[WITH('key1' = 'val1', 'key2' = 'val2')]


Restart or resume refresh on the dynamic table, also means that the background refresh job is resumed. Resume operations only work for current dynamic table, and don't cascade resumes downstream dynamic tables are suspended.

You can also specify one or more dynamic table options by WITH keyword to optimize the background refresh job, these options are only effective for the current operation and will not be persisted. 

Refresh

Code Block
languagesql
ALTER TABLE [catalog_name.][db_name.]table_name REFRESH

...



[PARTITION (key1=val1, key2=val2, ...)]

Manually trigger a dynamic table refresh, only works for the current table, doesn't trigger a cascade refresh of all downstream tables defaultly.

Note: If the refresh mode is continuous and the background job is running, you should be careful with the refresh command, which can lead to inconsistent data.

Set Freshness

Code Block
languagesql
ALTER TABLE [catalog_name.][db_name.]table_name 

...



  SET FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY | MONTH | YEAR}

Modify the time measure that the dynamic table’s content should lag behind updates to the base tables. Modifying freshness may affect the refresh mode of the background job, which may change from continuous mode to full mode, or from full mode to continuous mode, or remain unchanged. In any case, alter freshness will stop the current refresh job and create a new job with the updated freshness.

Note: If the refresh mode is manually specified when creating a dynamic table, modify freshness will not modify the refresh mode.

Set REFRESH_MODE

Code Block
languagesql
ALTER TABLE  [catalog_name.][db_name.]table_name 

...



  SET REFRESH_MODE = { FULL | CONTINUOUS }

Modify the refresh mode of the dynamic table, which may change from continuous mode to full mode, or full mode to continuous mode.

Show Job

Code Block
languagesql
SHOW JOB ${jobId} 

Show the specified job in the Flink cluster, the return information schema same as SHOW JOBS statement.

...

We introduce a new interface CatalogDynamicTable which extends the CatalogBaseTable. As described above, the CatalogDynamicTable, in addition to having the attributes of schema, options and partition keys , also encompasses three important metadata: definition query, freshness and refresh job.

Code Block
languagejava
/**

...



 * Represents the unresolved metadata of a dynamic table in a {@link Catalog}.

...



 *

...



 * <p>Dynamic Table definition: In the context of integrated stream-batch data storage, it provides

...



 * full history data and incremental changelog. By defining the data's production business logic and

...



 * freshness, data update is achieved through continuous or full refresh mode, while also possessing

...



 * the capability for both batch and incremental consumption.

...



 *

...



 * <p>The metadata for {@link CatalogDynamicTable} also includes the following four main parts:

...



 *

...



 * <ul>

...



 *   <li>Schema, comments, options and partition keys.

...



 *   <li>Data freshness, which determines when the data is generated and becomes visible for the user.

...



 *   <li>Data production business logic, also known as the definition query.

...



 *   <li>Background data refresh job, either through a flink streaming or scheduled batch job, it is

...



 *       initialized after the dynamic table is created.

...



 * </ul>

...



 *

...



 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,

...



 * String, RefreshMode, RefreshJobHandler)} for a basic implementation of this interface or create a

...



 * custom class that allows passing catalog-specific objects all the way down to the connector

...



 * creation (if necessary).

...



 */

...



@PublicEvolving

...



public interface CatalogDynamicTable extends CatalogBaseTable {

...



    /**

...



     * Creates an instance of {@link CatalogDynamicTable}.

...



     *

...



     * @param schema unresolved schema

...



     * @param comment optional comment

...



     * @param partitionKeys list of partition keys or an empty list if not partitioned

...



     * @param options options to configure the connector

...



     * @param snapshot table snapshot of the table

...



     * @param definitionQuery definition sql query

...



     * @param freshness data freshness

...



     * @param refreshMode data refresh mode

...



     * @param refreshJobHandler refresh job handler

...



     */

...



    static CatalogDynamicTable of(

...



            Schema schema,

...



            @Nullable String comment,

...



            List<String> partitionKeys,

...



            Map<String, String> options,

...



            @Nullable Long snapshot,

...



            String definitionQuery,

...



            String freshness,

...



            RefreshMode refreshMode,

...



            RefreshJobHandler refreshJobHandler) {

...



        return new DefaultCatalogDynamicTable(

...



                schema,

...



                comment,

...



                partitionKeys,

...



                options,

...



                snapshot,

...



                definitionQuery,

...



                freshness,

...



                refreshMode,

...



                refreshJobHandler);

...



    }

...



    @Override

...



    default TableKind getTableKind() {

...



        return TableKind.DYNAMIC_TABLE;

...



    }

...



    /**

...



     * Check if the table is partitioned or not.

...



     *

...



     * @return true if the table is partitioned; otherwise, false

...



     */

...



    boolean isPartitioned();

...



    /**

...



     * Get the partition keys of the table. This will be an empty set if the table is not

...



     * partitioned.

...



     *

...



     * @return partition keys of the table

...



     */

...



    List<String> getPartitionKeys();

...



    /**

...



     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.

...



     *

...



     * @return a new copy of this table with replaced table options

...



     */

...



    CatalogDynamicTable copy(Map<String, String> options);

...



    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */

...



    default Optional<Long> getSnapshot() {

...



        return Optional.empty();

...



    }

...



    /**

...



     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.

...



     * This is needed because the context such as current DB is lost after the session, in which

...



     * view is defined, is gone. Expanded query text takes care of this, as an example.

...



     *

...



     * <p>For example, for a dynamic table that is defined in the context of "default" database with

...



     * a query {@code select * from test1}, the expanded query text might become {@code select

...



     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in

...



     * database "default" and has two columns ("name" and "value").

...



     *

...



     * @return the dynamic table definition in expanded text.

...



     */

...



    String getDefinitionQuery();

...



    /** Get the freshness of the dynamic table which is used to determine the data refresh mode. */

...



    String getFreshness();

...



    /** Get the refresh mode of the dynamic table. */

...



    RefreshMode getRefreshMode();

...



    /** Get the refresh job handler of the dynamic table. */

...



    RefreshJobHandler getRefreshJobHandler();

...



    /** The refresh mode of the dynamic table. */

...



    @PublicEvolving

...



    enum RefreshMode {

...



        CONTINUOUS,

...



        FULL

...



    }

...



}


Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create dynamic table represents a streaming-batch unified storage,  which can provide full historical data and incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

...