Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hence, starting from the concept of unified streaming and batch processing (One SQL & Unified Table), it's time to turn Dynamic Table from an abstract concept into a tangible entity.  We We propose the Dynamic Table entity to simplify streaming and batch ETL pipeline cost-effectively. This allows users to no longer be confused by various complex concepts such as streaming and batch processing, as well as underlying technical details, and to focus solely on business logic and data freshness.

...

Code Block
languagesql
CREATE DYNAMIC TABLE dwd_orders
(
  PRIMARY KEY(ds, id) NOT ENFORCED
)
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '3' MINUTE
AS SELECT 
  o.ds
  o.id,
  o.order_number,
  o.user_id,
...
FROM 
  orders as o
  LEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prod
	  ON o.product_id = prod.id
  LEFT JOIN order_pay AS pay
  	ON o.id = pay.order_id and o.ds = pay.ds

...

Code Block
languagesql
-- 1. Pause table data refresh
ALTER DYNAMIC TABLE dwd_orders SUSPEND;

-- 2. Resume table data refresh
ALTER DYNAMIC TABLE dwd_orders RESUME

-- Set table option via WITH clause
WITH(
  'sink.parallesim' = '10'
);

...

If the REFRESH_MODE is specified at the time of table creation, its priority is higher than freshness, which means freshness will no longer be used to infer the refresh mode of the dynamic table.


Operations on dynamic tables are executed through the following Alter statements.

Suspend

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name SUSPEND

...

Note: If the refresh mode is continuous and the background job is running, you should be careful with the refresh command, which can lead to inconsistent data.

Set

Freshness

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name 

  SET FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY | MONTH | YEAR}

...

Note: If the refresh mode is manually specified when creating a dynamic table, modify freshness will not modify the refresh mode.

...

REFRESH_MODE

Code Block
languagesql
ALTER DYNAMIC TABLE  [catalog_name.][db_name.]table_name 

  SET REFRESH_MODE = { FULL | CONTINUOUS }

Modify the refresh mode of the dynamic table, which may change from continuous mode to full mode, or full mode to continuous mode.

Options

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name  SET ('key' = 'val');

Reset

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name RESET ('key');

Besides the aforementioned Alter syntax, dynamic tables currently do not support other Alter syntaxes that are applicable to regular Table.

Drop

Code Block
languagesql
DROP DYNAMIC TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name

...

Code Block
languagejava
/**
 * Represents the unresolved metadata of a dynamic table in a {@link Catalog}.
 *
 * <p>Dynamic Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogDynamicTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background data refresh job, either through a flink streaming or scheduled batch job, it is
 *       initialized after dynamic table is created.
 * </ul>
 *
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, RefreshMode, RefreshJobHandler)} for a basic implementation of this interface or create a
 * custom class that allows passing catalog-specific objects all the way down to the connector
 * creation (if necessary).
 */
@PublicEvolving
public interface CatalogDynamicTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.DYNAMIC_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogDynamicTable copy(Map<String, String> options);

    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();
    }

    /**
     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of dynamic table which is used to determine the data refresh mode. */
    Duration getFreshness();

    /** Get the refresh mode of dynamic table. */
    RefreshMode getRefreshMode();

    /** Get the refresh job handler of dynamic table. */
    RefreshJobHandlerRefreshHandler getRefreshJobHandler();

    /** The refresh mode of dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }
}

...

Code Block
languagejava
titleCatalog
@PublicEvolving
public interface Catalog {
    
    /**
     * Returns a factory for creating instances from catalog objects.
     *
     * <p>This method enables bypassing the discovery process. Implementers can directly pass
     * internal catalog-specific objects to their own factory. For example, a custom {@link
     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
     *
     * <p>If this catalog support to create dynamic table, you should also override this method to
     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and
     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override
     * this method, you must specify the physical connector identifier that this catalog represents
     * storage when create dynamic table. Otherwise, the planner can't find the {@link
     * DynamicTableFactory}.
     *
     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement
     * multiple supported extension points. An {@code instanceof} check is performed by the caller
     * that checks whether a required factory is implemented; otherwise the discovery process is
     * used.
     */
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }
}

...


RefreshHandler

We introduce the class RefreshJobHandlerRefreshHandler that records the meta information of the current dynamic table background refresh job.

Code Block
languagejava
/**
 * This interface represents the meta information of current dynamic table background refresh job.
 * The refresh mode of background job maybe continuous or full. The format of the meta information
 * in the two modes is not consistent, so we unify it by a structured json string jobDetail.
 *
 * <p>In continuous mode, the format of the meta information is { "clusterType": "yarn",
 * "clusterId": "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information format is { "schedulerType": "airflow", "endpoint": "xxx",
 * "workflowId": "yyy" }.
 */
@PublicEvolving
public class RefreshJobHandlerRefreshHandler {

    private final CatalogDynamicTable.RefreshMode actualRefreshMode;
    private final JobStateState jobStatestate;
    private final @Nullable String jobDetailrefreshDetail;

    public RefreshJobHandler(
            CatalogDynamicTable.RefreshMode actualRefreshMode,
            JobState jobState,
            @Nullable String jobDetail) {
        this.actualRefreshMode = actualRefreshMode;
        this.jobStatestate = jobStatestate;
        this.jobDetailrefreshDetail = jobDetailrefreshDetail;
    }

    public CatalogDynamicTable.RefreshMode getActualRefreshMode() {
        return actualRefreshMode;
    }

    public JobStateState getJobStategetState() {
        return jobState;
    }

    public String getJobDetailgetRefreshDetail() {
        return jobDetailrefreshDetail;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        RefreshJobHandler that = (RefreshJobHandler) o;
        return actualRefreshMode == that.actualRefreshMode
                && jobStatestate == that.jobStatestate
                && Objects.equals(jobDetailrefreshDetail, that.jobDetailrefreshDetail);
    }

    @Override
    public int hashCode() {
        return Objects.hash(actualRefreshMode, jobStatestate, jobDetailrefreshDetail);
    }

    @Override
    public String toString() {
        return "RefreshJobHandler{"
                + "actualRefreshMode="
                + actualRefreshMode
                + ", jobStatestate="
                + jobStatestate
                + ", jobDetailrefreshDetail='"
                + jobDetailrefreshDetail
                + '\''
                + '}';
    }

    /** Background refresh job state. */
    @PublicEvolving
    public enum JobStateState {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}

...

Code Block
languagejava
/**
 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogDynamicTable
        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog dynamic table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptions() {
        return origin.getOptions();
    }

    @Override
    public String getComment() {
        return origin.getComment();
    }

    @Override
    public CatalogBaseTable copy() {
        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);
    }

    @Override
    public Optional<String> getDescription() {
        return origin.getDescription();
    }

    @Override
    public Optional<String> getDetailedDescription() {
        return origin.getDetailedDescription();
    }

    @Override
    public boolean isPartitioned() {
        return origin.isPartitioned();
    }

    @Override
    public List<String> getPartitionKeys() {
        return origin.getPartitionKeys();
    }

    @Override
    public ResolvedCatalogDynamicTable copy(Map<String, String> options) {
        return new ResolvedCatalogDynamicTable(origin.copy(options), resolvedSchema);
    }

    @Override
    public CatalogDynamicTable getOrigin() {
        return origin;
    }

    @Override
    public ResolvedSchema getResolvedSchema() {
        return resolvedSchema;
    }

    @Override
    public String getDefinitionQuery() {
        return origin.getDefinitionQuery();
    }

    @Override
    public Duration getFreshness() {
        return origin.getFreshness();
    }

    @Override
    public RefreshMode getRefreshMode() {
        return origin.getRefreshMode();
    }

    @Override
    public RefreshJobHandlerRefreshHandler getRefreshJobHandlergetRefreshHandler() {
        return origin.getRefreshJobHandlergetRefreshHandler();
    }
}


Configuration

We need to introduce two options to control the background refresh job.

...

Code Block
languagejava
@PublicEvolving
public interface CatalogDynamicTable extends CatalogTable {

      /**
     * Creates an instance of {@link CatalogDynamicTable}.
     *
     * @param schema unresolved schema
     * @param comment optional comment
     * @param partitionKeys list of partition keys or an empty list if not partitioned
     * @param options options to configure the connector
     * @param snapshot table snapshot of the table
     * @param definitionQuery definition sql query
     * @param freshness data freshness
     * @param refreshMode data refresh mode
     * @param refreshJobInfo refresh job handler
     */
    static CatalogDynamicTable of(
            Schema schema,
            @Nullable String comment,
            List<String> partitionKeys,
            Map<String, String> options,
            @Nullable Long snapshot,
            String definitionQuery,
            String freshness,
            RefreshMode refreshMode,
            RefreshJobHandler refreshJobHandler) {
        return new DefaultCatalogDynamicTable(
                schema,
                comment,
                partitionKeys,
                options,
                snapshot,
                definitionQuery,
                freshness,
                refreshMode,
                refreshJobHandler);
    }

      /**
     * The definition query text of dynamic table. Text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of the dynamic table which is used to determine the data refresh mode. */
    String getFreshness();

    /** Get the refresh mode of the dynamic table. */
    RefreshMode getRefreshMode();

    /** Get the refresh job handler of the dynamic table. */
    RefreshJobHandler getRefreshJobHandlergetRefreshHandler();

    /** The refresh mode of the dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL,
    }
}

...