Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * Represents the unresolved metadata of a dynamic table in a {@link Catalog}.
 *
 * <p>Dynamic Table definition: In the context of integrated stream-batch data storage, it provides
 * full history data and incremental changelog. By defining the data's production business logic and
 * freshness, data update is achieved through continuous or full refresh mode, while also possessing
 * the capability for both batch and incremental consumption.
 *
 * <p>The metadata for {@link CatalogDynamicTable} also includes the following four main parts:
 *
 * <ul>
 *   <li>Schema, comments, options and partition keys.
 *   <li>Data freshness, which determines when the data is generated and becomes visible for user.
 *   <li>Data production business logic, also known as the definition query.
 *   <li>Background data refresh job, either through a flink streaming or scheduled batch job, it is
 *       initialized after dynamic table is created.
 * </ul>
 *
 * <p>A catalog implementer can either use {@link #of(Schema, String, List, Map, Long, String,
 * Duration, RefreshMode, RefreshJobHandlerRefreshHandler)} for a basic implementation of this interface or create a
 * custom class that allows passing catalog-specific objects all the way down to the connector
 * creation (if necessary).
 */
@PublicEvolving
public interface CatalogDynamicTable extends CatalogBaseTable {

    @Override
    default TableKind getTableKind() {
        return TableKind.DYNAMIC_TABLE;
    }

    /**
     * Check if the table is partitioned or not.
     *
     * @return true if the table is partitioned; otherwise, false
     */
    boolean isPartitioned();

    /**
     * Get the partition keys of the table. This will be an empty set if the table is not
     * partitioned.
     *
     * @return partition keys of the table
     */
    List<String> getPartitionKeys();

    /**
     * Returns a copy of this {@code CatalogDynamicTable} with given table options {@code options}.
     *
     * @return a new copy of this table with replaced table options
     */
    CatalogDynamicTable copy(Map<String, String> options);

    /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */
    default Optional<Long> getSnapshot() {
        return Optional.empty();
    }

    /**
     * The definition query text of dynamic table, text is expanded in contrast to the original SQL.
     * This is needed because the context such as current DB is lost after the session, in which
     * view is defined, is gone. Expanded query text takes care of this, as an example.
     *
     * <p>For example, for a dynamic table that is defined in the context of "default" database with
     * a query {@code select * from test1}, the expanded query text might become {@code select
     * `test1`.`name`, `test1`.`value` from `default`.`test1`}, where table test1 resides in
     * database "default" and has two columns ("name" and "value").
     *
     * @return the dynamic table definition in expanded text.
     */
    String getDefinitionQuery();

    /** Get the freshness of dynamic table which is used to determine the dataphysical refresh mode. */
    Duration getFreshness();

    /** Get the logical refresh mode of dynamic table. */
    RefreshModeLogicalRefreshMode getRefreshModegetLogicalRefreshMode();

    /** Get the physical refresh handlermode of dynamic table. */
    RefreshHandlerRefreshMode getRefreshJobHandlergetRefreshMode();

    /** TheGet the refresh modestatus of dynamic table. */
    RefreshStatus @PublicEvolving
    enum RefreshMode {
getRefreshStatus();

    /** Return summary description of refresh handler. */
    Optional<String> getRefreshHandlerDescription();

    CONTINUOUS,/**
     * Return the serializable FULL
refresh handler of  }
}

Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create dynamic table represents a  physical storage,  which can provide full historical data or incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

Code Block
languagejava
titleCatalog
@PublicEvolving
public interface Catalog {
    
    /**
     * Returns a factory for creating instances from catalog objects.
     *
     * <p>This method enables bypassing the discovery process. Implementers can directly pass
     * internal catalog-specific objects to their own factory. For example, a custom {@link
     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
     *dynamic table. This will not be used for describe
     * <p>If this catalog support to create dynamic table, you should also override this method totable.
     */
 provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} andbyte[] getSerializableRefreshHandler();

     /** {@linkThe DynamicTableSink}logical correctlyrefresh duringmode compileof optimizationdynamic phasetable. If*/
  you don't override@PublicEvolving
    enum *LogicalRefreshMode this{
 method, you must specify the physical connector identifier/**
 that this catalog represents
     * storageThe whenrefresh createpipeline dynamicwill table.be Otherwise,executed thein planner can't find thecontinuous mode, corresponding to {@link
         * DynamicTableFactoryRefreshMode#CONTINUOUS}.
     *
     * <p>Because/
 all factories are interfaces, the returned {@link Factory} instance can implement
 CONTINUOUS,

        * multiple supported extension points. An {@code instanceof} check is performed by the caller
     * that checks whether a required factory is implemented; otherwise the discovery process is
/**
         * The refresh pipeline will be executed in full mode, corresponding to {@link
         * usedRefreshMode#FULL}.
         */
    default  Optional<Factory> getFactory() { FULL,

        return Optional.empty();
/**
        }
}

RefreshHandler

We introduce the class RefreshHandler that records the meta information of the current dynamic table background refresh job.

Code Block
languagejava
/**
 * This interface represents the meta information of current dynamic table background refresh job.
 * The refresh mode of background job maybe continuous or full. The format of the meta information
 * in the two modes is not consistent, so we unify it by a structured json string jobDetail.
 *
 * <p>In continuous mode, the format of the meta information is { "clusterType": "yarn",
 * "clusterId": "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information format is { "schedulerType": "airflow", "endpoint": "xxx",
 * "workflowId": "yyy" }.
 */
@PublicEvolving
public class RefreshHandler {

    private final CatalogDynamicTable.RefreshMode actualRefreshMode;
    private final State state;
    private final @Nullable String refreshDetail;

    public RefreshJobHandler(
            CatalogDynamicTable.RefreshMode actualRefreshMode,
            State state,
            @Nullable String jobDetail) {
        this.actualRefreshMode = actualRefreshMode;
        this.state = state;
        this.refreshDetail = refreshDetail;
    }

    public CatalogDynamicTable.RefreshMode getActualRefreshMode() {
        return actualRefreshMode;
    }

    public State getState() {
        return state;
    }

    public String getRefreshDetail() {
        return refreshDetail;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        RefreshJobHandler that = (RefreshJobHandler) o;
        return actualRefreshMode == that.actualRefreshMode
                && state == that.state
                && Objects.equals(refreshDetail, that.refreshDetail);
    }

    @Override
    public int hashCode() {
        return Objects.hash(actualRefreshMode, state, refreshDetail);
    }

    @Override
    public String toString() {
        return "RefreshJobHandler{"
                + "actualRefreshMode="
                + actualRefreshMode
                + ", state="
                + state
                + ", refreshDetail='"
                + refreshDetail
                + '\''
                + '}';
    }

    /** Background refresh job state. */
    @PublicEvolving
    public enum State {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}

ResolvedCatalogDynamicTable

We introduce the public class ResolvedCatalogDynamicTable that represents a validated CatalogDynamicTable.

 * The refresh pipeline mode is determined by freshness of dynamic table, either {@link
         * RefreshMode#FULL} or {@link RefreshMode#CONTINUOUS}.
         */
        AUTOMATIC
    }

    /** The physical refresh mode of dynamic table. */
    @PublicEvolving
    enum RefreshMode {
        CONTINUOUS,
        FULL
    }

    /** Background refresh pipeline status of dynamic table. */
    @PublicEvolving
    enum RefreshStatus {
        INITIALIZING,
        ACTIVATED,
        SUSPENDED
    }
}


Catalog

For Catalog, we don’t need to introduce any new method. But we should highlight that the  Catalog support create dynamic table represents a  physical storage,  which can provide full historical data or incremental changelog, so developers need to override the getFactory method to find the DynamicTableFactory for planner to compile a query.

Code Block
languagejava
titleCatalog
@PublicEvolving
public interface Catalog {
    
    /**
     * Returns a factory for creating instances from catalog objects.
     *
     * <p>This method enables bypassing the discovery process. Implementers can directly pass
     * internal catalog-specific objects to their own factory. For example, a custom {@link
     * CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
     *
     * <p>If this catalog support to create dynamic table, you should also override this method to
     * provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource} and
     * {@link DynamicTableSink} correctly during compile optimization phase. If you don't override
     * this method, you must specify the physical connector identifier that this catalog represents
     * storage when create dynamic table. Otherwise, the planner can't find the {@link
     * DynamicTableFactory}.
     *
     * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement
     * multiple supported extension points. An {@code instanceof} check is performed by the caller
     * that checks whether a required factory is implemented; otherwise the discovery process is
     * used.
     */
    default Optional<Factory> getFactory() {
        return Optional.empty();
    }
}


RefreshHandler

We introduce the class RefreshHandler that records the meta information of the current dynamic table background refresh job.

Code Block
languagejava
/**
 * This interface represents the meta information of current dynamic table background refresh
 * pipeline. The refresh mode maybe continuous or full. The format of the meta information in the
 * two modes is not consistent, so user need to implementation this interface according to .
 *
 * <p>This meta information will be serialized to bytes by {@link RefreshHandlerSerializer}, then
 * store to Catalog for suspend or drop {@link CatalogDynamicTable}.
 *
 * <p>In continuous mode, the format of the meta information maybe { "clusterType": "yarn",
 * "clusterId": "xxx", "jobId": "yyyy" }.
 *
 * <p>In full mode, the meta information format maybe { "endpoint": "xxx", "workflowId": "yyy" }.
 * Due to you may use different workflow scheduler plugin in this mode, you should implement this
 * interface according to your plugin.
 */
@PublicEvolving
public interface RefreshHandler {

    /** Returns a string that summarizes this refresh handler meta information. */
    String asSummaryString();
}


ResolvedCatalogDynamicTable

We introduce the public class ResolvedCatalogDynamicTable that represents a validated CatalogDynamicTable.

Code Block
languagejava
/**
 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogDynamicTable
        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog dynamic table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptions() {
        return origin.getOptions();
    }

    @Override
    public String getComment() {
        return origin.getComment();
    }

    @Override
    public CatalogBaseTable copy() {
        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copy(), resolvedSchema);
    }

    @Override
    public Optional<String> getDescription() {
        return origin.getDescription(
Code Block
languagejava
/**
 * A validated {@link CatalogDynamicTable} that is backed by the original metadata coming from the
 * {@link Catalog} but resolved by the framework.
 *
 * <p>Note: This will be converted to {@link ResolvedCatalogTable} by framework during planner
 * optimize query phase.
 */
@PublicEvolving
public class ResolvedCatalogDynamicTable
        implements ResolvedCatalogBaseTable<CatalogDynamicTable>, CatalogDynamicTable {

    private final CatalogDynamicTable origin;

    private final ResolvedSchema resolvedSchema;

    public ResolvedCatalogDynamicTable(CatalogDynamicTable origin, ResolvedSchema resolvedSchema) {
        this.origin =
                Preconditions.checkNotNull(
                        origin, "Original catalog dynamic table must not be null.");
        this.resolvedSchema =
                Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null.");
    }

    @Override
    public Map<String, String> getOptionsOptional<String> getDetailedDescription() {
        return origin.getOptionsgetDetailedDescription();
    }

    @Override
    public Stringboolean getCommentisPartitioned() {
        return origin.getCommentisPartitioned();
    }

    @Override
    public CatalogBaseTableList<String> copygetPartitionKeys() {
        return new ResolvedCatalogDynamicTable((CatalogDynamicTable) origin.copygetPartitionKeys(), resolvedSchema);
    }

    @Override
    public Optional<String>ResolvedCatalogDynamicTable getDescription(copy(Map<String, String> options) {
        return  new ResolvedCatalogDynamicTable(origin.getDescriptioncopy(options), resolvedSchema);
    }

    @Override
    public Optional<String>Optional<Long> getDetailedDescriptiongetSnapshot() {
        return originOptional.getDetailedDescriptionempty();
    }

    @Override
    public booleanCatalogDynamicTable isPartitionedgetOrigin() {
        return origin.isPartitioned();
    }

    @Override
    public List<String>ResolvedSchema getPartitionKeysgetResolvedSchema() {
        return origin.getPartitionKeys()resolvedSchema;
    }

    @Override
    public ResolvedCatalogDynamicTableString copy(Map<String, String> optionsgetDefinitionQuery() {
        return new ResolvedCatalogDynamicTable(origin.copygetDefinitionQuery(options), resolvedSchema);
    }

    @Override
    public CatalogDynamicTableDuration getOrigingetFreshness() {
        return origin.getFreshness();
    }

    @Override
    public ResolvedSchemaLogicalRefreshMode getResolvedSchemagetLogicalRefreshMode() {
        return resolvedSchemaorigin.getLogicalRefreshMode();
    }

    @Override
    public StringRefreshMode getDefinitionQuerygetRefreshMode() {
        return origin.getDefinitionQuerygetRefreshMode();
    }

    @Override
    public DurationRefreshStatus getFreshnessgetRefreshStatus() {
        return origin.getFreshnessgetRefreshStatus();
    }

    @Override
    public RefreshModeOptional<String> getRefreshModegetRefreshHandlerDescription() {
        return origin.getRefreshModegetRefreshHandlerDescription();
    }

    @Nullable
    @Override
    public RefreshHandlerbyte[] getRefreshHandlergetSerializableRefreshHandler() {
        return origin.getRefreshHandlergetSerializableRefreshHandler();
    }
}


Configuration

We need to introduce two options to control the background refresh job.

...