Code Block
 * Represents the unresolved metadata of a dynamic table in a {@link Catalog}.
 * <p>Dynamic Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 * <p>The metadata for {@link CatalogDynamicTable} also includes the following four main parts:
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background data refresh job, either through a flink streaming or scheduled batch job, it is
 *       initialized after dynamic table is created.
 * </ul>
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, RefreshMode, RefreshJobHandlerRefreshHandler)} for a basic implementation of this interface or create a
 * custom class that allows passing catalog-specific objects all the way down to the connector
 * creation (if necessary).
public interface CatalogDynamicTable extends CatalogBaseTable {

    default TableKind getTableKind() {
        return TableKind.DYNAMIC_TABLE;

     * Check if the table is partitioned or not.
     * @return true if the table is partitioned; otherwise, false
    boolean isPartitioned();

     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     * @return partition keys of the table
    List<String> getPartitionKeys();

     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.
     * @return a new copy of this table with replaced table options
    CatalogDynamicTable copy(Map<String, String> options);

    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();

     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     * @return the dynamic table definition in expanded text.
    String getDefinitionQuery();

    /** Get the freshness of dynamic table which is used to determine the dataphysical refresh mode. */
    Duration getFreshness();

    /** Get the logical refresh mode of dynamic table. */
    RefreshModeLogicalRefreshMode getRefreshModegetLogicalRefreshMode();

    /** Get the physical refresh handlermode of dynamic table. */
    RefreshHandlerRefreshMode getRefreshJobHandlergetRefreshMode();

    /** TheGet the refresh modestatus of dynamic table. */
    RefreshStatus @PublicEvolving
    enum RefreshMode {

    /** Return summary description of refresh handler. */
    Optional<String> getRefreshHandlerDescription();

     * Return the serializable FULL
refresh handler of  }


We introduce the class RefreshHandler that records the meta information of the current dynamic table background refresh job.

Code Block
 * This interface represents the meta information of current dynamic table background refresh
 * pipeline. The refresh mode maybe continuous or full. The format of the meta information in the
 * two modes is not consistent, so user need to implementation this interface according to .
 * <p>This meta information will be serialized to bytes by {@link RefreshHandlerSerializer}, then
 * store to Catalog for suspend or drop {@link CatalogDynamicTable}.
 * <p>In continuous mode, the format of the meta information maybe { "clusterType": "yarn",
 * "clusterId": "xxx", "jobId": "yyyy" }.
 * <p>In full mode, the meta information format maybe { "endpoint": "xxx", "workflowId": "yyy" }.
 * Due to you may use different workflow scheduler plugin in this mode, you should implement this
 * interface according to your plugin.
public interface RefreshHandler {

    /** Returns a string that summarizes this refresh handler meta information. */
    String asSummaryString();


We introduce the public class ResolvedCatalogDynamicTable that represents a validated CatalogDynamicTable.

Code Block
 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
public class ResolvedCatalogDynamicTable
        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                        origin, "Original catalog dynamic table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");

    public Map<String, String> getOptions() {
        return origin.getOptions();

    public String getComment() {
        return origin.getComment();

    public CatalogBaseTable copy() {
        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);

    public Optional<String> getDescription() {
        return origin.getDescription(
We need to introduce two options to control the background refresh job.
