Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagesql
-- 1. Pause table data refresh
ALTER DYNAMIC TABLE dwd_orders SUSPEND;

-- 2. Resume table data refresh
ALTER DYNAMIC TABLE dwd_orders RESUME

-- Set table option via WITH clause
WITH(
  'sink.parallesim' = '10'
);

...

Code Block
languagesql
-- Refresh historical partition
ALTER DYNAMIC TABLE dwd_orders REFRESH PARTITION(ds='20231023')

...

Code Block
languagesql
ALTER DYNAMIC TABLE dwd_orders SET FRESHNESS = INTERVAL '1' DAY

...

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name SUSPEND

...

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name RESUME

[WITH('key1' = 'val1', 'key2' = 'val2')]

...

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name REFRESH

[PARTITION (key1=val1, key2=val2, ...)]

...

Code Block
languagesql
ALTER DYNAMIC TABLE [catalog_name.][db_name.]table_name 

  SET FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY | MONTH | YEAR}

...

Code Block
languagesql
ALTER DYNAMIC TABLE  [catalog_name.][db_name.]table_name 

  SET REFRESH_MODE = { FULL | CONTINUOUS }

Modify the refresh mode of the dynamic table, which may change from continuous mode to full mode, or full mode to continuous mode.

...

Drop

Code Block
languagesql
DROP DYNAMIC TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name

Drop dynamic table will delete the refresh job first, then drop the metadata and data.

Describe Job

Code Block
languagesql
SHOW{ DESCRIBE | DESC } JOB ${jobId} 

Show Describe the specified job information in the Flink cluster, the return information schema same as SHOW JOBS statement.

Other operations supported by dynamic tables leverage the operations already supported on existing tables.

Interface Changes

CatalogDynamicTable

Interface Changes

CatalogDynamicTable

We introduce We introduce a new interface CatalogDynamicTable which extends the CatalogBaseTable. As described above, the CatalogDynamicTable, in addition to having the attributes of schema, options and partition keys , also encompasses three important metadata: definition query, freshness and refresh job.

Code Block
languagejava
/**
 * Represents the unresolved metadata of a dynamic table in a {@link Catalog}.
 *
 * <p>Dynamic Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogDynamicTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background data refresh job, either through a flink streaming or scheduled batch job, it is
 *       initialized after dynamic table is created.
 * </ul>
 *
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * String, RefreshMode, RefreshJobHandler)} for a basic implementation of this interface or create a
 * custom class that allows passing catalog-specific objects all the way down to the connector
 * creation (if necessary).
 */
@PublicEvolving
public interface CatalogDynamicTable extends CatalogBaseTable {

    /**
     * Creates an instance of {@link CatalogDynamicTable}.
     *
     * @param schema unresolved schema
     * @param comment optional comment
     * @param partitionKeys list of partition keys or an empty list if not partitioned
     * @param options options to configure the connector
     * @param snapshot table snapshot of the table
     * @param definitionQuery definition sql query
     * @param freshness data freshness
     * @param refreshMode data refresh mode
     * @param refreshJobHandler refresh job handler
     */
    static CatalogDynamicTable of(
            Schema schema,
            @Nullable String comment,
            List<String> partitionKeys,
            Map<String, String> options,
            @Nullable Long snapshot,
            String definitionQuery,
            String freshness,
            RefreshMode refreshMode,
            RefreshJobHandler refreshJobHandler) {
        return new DefaultCatalogDynamicTable(
                schema,
                comment,
                partitionKeys,
                options,
                snapshot,
                definitionQuery,
                freshness,
                refreshMode,
                refreshJobHandler);
    }
 * </ul>
 *
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, RefreshMode, RefreshJobHandler)} for a basic implementation of this interface or create a
 * custom class that allows passing catalog-specific objects all the way down to the connector
 * creation (if necessary).
 */
@PublicEvolving
public interface CatalogDynamicTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.DYNAMIC_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogDynamicTable copy(Map<String, String> options);

    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();
    }

    /**
     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of dynamic table which is used to determine the data refresh mode. */
    StringDuration getFreshness();

    /** Get the refresh mode of dynamic table. */
    RefreshMode getRefreshMode();

    /** Get the refresh job handler of dynamic table. */
    RefreshJobHandler getRefreshJobHandler();

    /** The refresh mode of dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }
}

...

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support createing create dynamic table represents a  physical storage,  which can provide full historical data or incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

...

Code Block
languagejava
/**
 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogDynamicTable
        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog dynamic table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptions() {
        return origin.getOptions();
    }

    @Override
    public String getComment() {
        return origin.getComment();
    }

    @Override
    public CatalogBaseTable copy() {
        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);
    }

    @Override
    public Optional<String> getDescription() {
        return origin.getDescription();
    }

    @Override
    public Optional<String> getDetailedDescription() {
        return origin.getDetailedDescription();
    }

    @Override
    public boolean isPartitioned() {
        return origin.isPartitioned();
    }

    @Override
    public List<String> getPartitionKeys() {
        return origin.getPartitionKeys();
    }

    @Override
    public ResolvedCatalogDynamicTable copy(Map<String, String> options) {
        return new ResolvedCatalogDynamicTable(origin.copy(options), resolvedSchema);
    }

    @Override
    public CatalogDynamicTable getOrigin() {
        return origin;
    }

    @Override
    public ResolvedSchema getResolvedSchema() {
        return resolvedSchema;
    }

    @Override
    public String getDefinitionQuery() {
        return origin.getDefinitionQuery();
    }

    @Override
    public StringDuration getFreshness() {
        return origin.getFreshness();
    }

    @Override
    public RefreshMode getRefreshMode() {
        return origin.getRefreshMode();
    }

    @Override
    public RefreshJobHandler getRefreshJobHandler() {
        return origin.getRefreshJobHandler();
    }
}

...