Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Similar to classes such as Expression/ResolvedExpression or UnresolvedDataType/DataType, a schema should be provided in an unresolved and resolved variant.

Currently, TableSchema is a hybrid between resolved and unresolved schema which leads to methods such as

...

  • make it visible whether a schema is resolved or unresolved
  • offer a unified API for FLIP-129, FLIP-136, and catalogs
  • allow arbitrary data types and expressions in the schema for watermark spec or computed columns
  • have access to other catalogs for declaring a data type or expression via CatalogManager
  • cleaned up TableSchema
  • remain backwards compatible in the persisted properties and API

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

  • org.apache.flink.table.api.Schema
  • org.apache.flink.table.catalog.ResolvedSchema
  • CatalogBaseTable#getUnresolvedSchema(): Schema
  • org.apache.flink.table.catalog.SchemaResolver
  • org.apache.flink.table.catalog.ResolvedCatalogTable
  • org.apache.flink.table.catalog.ResolvedCatalogTable#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.catalog.ResolvedCatalogView
  • org.apache.flink.table.catalog.ResolvedCatalogView#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.factories.DynamicTableFactory.Context#getCatalogTable(): ResolvedCatalogTable
  • org.apache.flink.table.api.Table#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.operations.QueryOperation#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.api.TableResult#getResolvedSchema(): ResolvedSchema

Proposed Changes

Schema

API class used by programmatic DDL, DataStream API conversion, and Catalog API.

Notes:

  • Stores Expression and AbstractDataType
  • Offers convenient methods to adopt columns etc. from other resolved or unresolved schema instances
  • It replaces TableSchema.Builder


Schema {
    
    static Schema.Builder newBuilder();

    List<UnresolvedColumn> getColumns();

    List<UnresolvedWatermarkSpec> getWatermarkSpecs();

    Optional<UnresolvedPrimaryKey> getPrimaryKey();

    ResolvedSchema resolve(SchemaResolver);

    Builder {
        Builder fromSchema(Schema);

        Builder fromResolvedSchema(ResolvedSchema);

        Builder column(String, AbstractDataType<?>);

        Builder columnByExpression(String, Expression);

        Builder columnByMetadata(String, AbstractDataType<?>);

        Builder columnByMetadata(String, AbstractDataType<?>, boolean);

        Builder columnByMetadata(String, AbstractDataType<?>, String);

        Builder columnByMetadata(String, AbstractDataType<?>, String, boolean);

        Builder watermark(String, Expression);

        Builder primaryKey(String... columnNames);

        Builder primaryKeyNamed(String, String...);

        Schema build();
    }
}

ResolvedSchema

Class that is exposed by the framework at different locations for validated and complete schema.

Notes:

  • Stores ResolvedExpression and DataType
  • It replaces TableSchema
  • Expressions that originated from SQL will have an implementation of ResolvedExpression#asSerializableString which makes it possible to serialize them into properties for the catalog
ResolvedSchema {

    int getColumnCount();

    List<Column> getColumns();

    Optional<Column> getColumn(int);

    Optional<Column> getColumn(String);

    List<WatermarkSpec> getWatermarkSpecs();

    Optional<UniqueConstraint> getPrimaryKey();

    DataType toRowDataType();

    DataType toPhysicalRowDataType();

    DataType toPersistedRowDataType();
}

SchemaResolver

References parser, data type factory, catalog manager, expression resolver, etc. for resolving SQL and Table API expressions and data types.

It works similar to CatalogTableSchemaResolver and will perform validation.

Notes:

  • Depending on the context the resolver is configured in streaming/batch mode
  • Depending on the context the resolver support metadata columns (not supported for DataStream API conversion)
  • Instances are provided by the CatalogManager
SchemaResolver {

    ResolvedSchema resolve(Schema schema);

    boolean supportsMetadata();

    boolean isStreamingMode();
}

Updated Catalog Class Hierarchy

Due to the updated schema classes, we also need to give better semantics to the existing Catalog class hierarchy while maintaining backwards compatibility.

Some assumptions that come up during the offline discussions:

→ A CatalogView must have a schema.

  • This is defined by the SQL standard (see 4.15.7 Table descriptors).
  • Also Hive stores a view schema but the schema is not validated when executing queries on the view. And the stored schema will not be updated even the underlying table schema changed.
  • We also just consider the schema as metadata and don't validate it yet. However, platforms that use our catalog interfaces can do that already.

→ A CatalogTable and CatalogView are metadata objects. Thus, they are unresolved and return an unresolved schema.

→ Only validated and thus resolved CatalogTable's/CatalogView's should be put into a catalog.

We suggest the following interface additions:


dddDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

...