You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state"Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: 1.13

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Schema information is necessary at different locations in the Table API for defining tables and/or views. In particular, it is necessary to define the schema in a programmatic DDL (FLIP-129) and when converting a DataStream to a Table (FLIP-136).

We need similar APIs in the Catalog interfaces such that catalog implementations can define table/views in a unified way.

Furthermore, a catalog implementation needs a way to encode/decode a schema into properties.

Similar to classes such as Expression/ResolvedExpression or UnresolvedDataType/DataType, a schema should be provided in an unresolved and resolved variant.

Currently, TableSchema is a hybrid between resolved and unresolved schema which leads to methods such as

CatalogTableSchemaResolver.resolve(TableSchema): TableSchema

where the updated content of the schema is not directly visible.

This FLIP updates the class hierarchy to achieve the following goals:

  • make it visible whether a schema is resolved or unresolved
  • offer a unified API for FLIP-129, FLIP-136, and catalogs
  • allow arbitrary data types and expressions in the schema for watermark spec or columns
  • have access to other catalogs for declaring a data type or expression via CatalogManager
  • cleaned up TableSchema
  • remain backwards compatible in the persisted properties and API

Public Interfaces

  • org.apache.flink.table.api.Schema
  • org.apache.flink.table.catalog.ResolvedSchema
  • CatalogBaseTable#getUnresolvedSchema(): Schema
  • org.apache.flink.table.catalog.SchemaResolver
  • org.apache.flink.table.catalog.ResolvedCatalogTable
  • org.apache.flink.table.catalog.ResolvedCatalogTable#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.catalog.ResolvedCatalogView
  • org.apache.flink.table.catalog.ResolvedCatalogView#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.factories.DynamicTableFactory.Context#getCatalogTable(): ResolvedCatalogTable
  • org.apache.flink.table.api.Table#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.operations.QueryOperation#getResolvedSchema(): ResolvedSchema
  • org.apache.flink.table.api.TableResult#getResolvedSchema(): ResolvedSchema

Proposed Changes

Schema

API class used by programmatic DDL, DataStream API conversion, and Catalog API.

Notes:

  • Stores Expression and AbstractDataType
  • Offers convenient methods to adopt columns etc. from other resolved or unresolved schema instances
  • It replaces TableSchema.Builder


Schema {
    
    static Schema.Builder newBuilder();

    List<UnresolvedColumn> getColumns();

    List<UnresolvedWatermarkSpec> getWatermarkSpecs();

    Optional<UnresolvedPrimaryKey> getPrimaryKey();

    ResolvedSchema resolve(SchemaResolver);

    Builder {
        Builder fromSchema(Schema);

        Builder fromResolvedSchema(ResolvedSchema);

        Builder column(String, AbstractDataType<?>);

        Builder columnByExpression(String, Expression);

        Builder columnByMetadata(String, AbstractDataType<?>);

        Builder columnByMetadata(String, AbstractDataType<?>, boolean);

        Builder columnByMetadata(String, AbstractDataType<?>, String);

        Builder columnByMetadata(String, AbstractDataType<?>, String, boolean);

        Builder watermark(String, Expression);

        Builder primaryKey(String... columnNames);

        Builder primaryKeyNamed(String, String...);

        Schema build();
    }
}

ResolvedSchema

Class that is exposed by the framework at different locations for validated and complete schema.

Notes:

  • Stores ResolvedExpression and DataType
  • It replaces TableSchema
  • Expressions that originated from SQL will have an implementation of ResolvedExpression#asSerializableString which makes it possible to serialize them into properties for the catalog
ResolvedSchema {

    int getColumnCount();

    List<Column> getColumns();

    Optional<Column> getColumn(int);

    Optional<Column> getColumn(String);

    List<WatermarkSpec> getWatermarkSpecs();

    Optional<UniqueConstraint> getPrimaryKey();

    DataType toRowDataType();

    DataType toPhysicalRowDataType();

    DataType toPersistedRowDataType();
}

SchemaResolver

References parser, data type factory, catalog manager, expression resolver, etc. for resolving SQL and Table API expressions and data types.

It works similar to CatalogTableSchemaResolver and will perform validation.

Notes:

  • Depending on the context the resolver is configured in streaming/batch mode
  • Depending on the context the resolver support metadata columns (not supported for DataStream API conversion)
  • Instances are provided by the CatalogManager
SchemaResolver {

    ResolvedSchema resolve(Schema schema);

    boolean supportsMetadata();

    boolean isStreamingMode();
}

Updated Catalog Class Hierarchy

Due to the updated schema classes, we also need to give better semantics to the existing Catalog class hierarchy while maintaining backwards compatibility.

Some assumptions that come up during the offline discussions:

→ A CatalogView must have a schema.

  • This is defined by the SQL standard (see 4.15.7 Table descriptors).
  • Also Hive stores a view schema but the schema is not validated when executing queries on the view. And the stored schema will not be updated even the underlying table schema changed.
  • We also just consider the schema as metadata and don't validate it yet. However, platforms that use our catalog interfaces can do that already.

→ A CatalogTable and CatalogView are metadata objects. Thus, they are unresolved and return an unresolved schema.

→ Only validated and thus resolved CatalogTable's/CatalogView's should be put into a catalog.

We suggest the following interface additions:


ddd

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels