...
Similar to classes such as Expression/ResolvedExpression or UnresolvedDataType/DataType, a schema should be provided in an unresolved and resolved variant.
Currently, TableSchema is a hybrid between resolved and unresolved schema which leads to methods such as
...
- make it visible whether a schema is resolved or unresolved
- offer a unified API for FLIP-129, FLIP-136, and catalogs
- allow arbitrary data types and expressions in the schema for watermark spec or computed columns
- have access to other catalogs for declaring a data type or expression via CatalogManager
- cleaned up TableSchema
- remain backwards compatible in the persisted properties and API
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log formatThe network protocol and api behaviorAny class in the public packages under clientsConfiguration, especially client configurationorg/apache/kafka/common/serializationorg/apache/kafka/commonorg/apache/kafka/common/errorsorg/apache/kafka/clients/producerorg/apache/kafka/clients/consumer (eventually, once stable)
MonitoringCommand line tools and argumentsAnything else that will likely break existing users in some way when they upgrade
Proposed Changes
org.apache.flink.table.api.Schema
org.apache.flink.table.catalog.ResolvedSchema
CatalogBaseTable#getUnresolvedSchema(): Schema
org.apache.flink.table.catalog.SchemaResolver
org.apache.flink.table.catalog.ResolvedCatalogTable
org.apache.flink.table.catalog.ResolvedCatalogTable#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.catalog.ResolvedCatalogView
org.apache.flink.table.catalog.ResolvedCatalogView#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.factories.DynamicTableFactory.Context#getCatalogTable(): ResolvedCatalogTable
org.apache.flink.table.api.Table#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.operations.QueryOperation#getResolvedSchema(): ResolvedSchema
org.apache.flink.table.api.TableResult#getResolvedSchema(): ResolvedSchema
Proposed Changes
Schema
API class used by programmatic DDL, DataStream API conversion, and Catalog API.
Notes:
- Stores
Expression
andAbstractDataType
- Offers convenient methods to adopt columns etc. from other resolved or unresolved schema instances
- It replaces
TableSchema.Builder
Schema {
static Schema.Builder newBuilder();
List<UnresolvedColumn> getColumns();
List<UnresolvedWatermarkSpec> getWatermarkSpecs();
Optional<UnresolvedPrimaryKey> getPrimaryKey();
ResolvedSchema resolve(SchemaResolver);
Builder {
Builder fromSchema(Schema);
Builder fromResolvedSchema(ResolvedSchema);
Builder column(String, AbstractDataType<?>);
Builder columnByExpression(String, Expression);
Builder columnByMetadata(String, AbstractDataType<?>);
Builder columnByMetadata(String, AbstractDataType<?>, boolean);
Builder columnByMetadata(String, AbstractDataType<?>, String);
Builder columnByMetadata(String, AbstractDataType<?>, String, boolean);
Builder watermark(String, Expression);
Builder primaryKey(String... columnNames);
Builder primaryKeyNamed(String, String...);
Schema build();
}
}
ResolvedSchema
Class that is exposed by the framework at different locations for validated and complete schema.
Notes:
- Stores
ResolvedExpression
andDataType
- It replaces
TableSchema
- Expressions that originated from SQL will have an implementation of
ResolvedExpression#asSerializableString
which makes it possible to serialize them into properties for the catalog
ResolvedSchema {
int getColumnCount();
List<Column> getColumns();
Optional<Column> getColumn(int);
Optional<Column> getColumn(String);
List<WatermarkSpec> getWatermarkSpecs();
Optional<UniqueConstraint> getPrimaryKey();
DataType toRowDataType();
DataType toPhysicalRowDataType();
DataType toPersistedRowDataType();
}
SchemaResolver
References parser, data type factory, catalog manager, expression resolver, etc. for resolving SQL and Table API expressions and data types.
It works similar to CatalogTableSchemaResolver
and will perform validation.
Notes:
- Depending on the context the resolver is configured in streaming/batch mode
- Depending on the context the resolver support metadata columns (not supported for DataStream API conversion)
- Instances are provided by the
CatalogManager
SchemaResolver {
ResolvedSchema resolve(Schema schema);
boolean supportsMetadata();
boolean isStreamingMode();
}
Updated Catalog Class Hierarchy
Due to the updated schema classes, we also need to give better semantics to the existing Catalog class hierarchy while maintaining backwards compatibility.
Some assumptions that come up during the offline discussions:
→ A CatalogView must have a schema.
- This is defined by the SQL standard (see 4.15.7 Table descriptors).
- Also Hive stores a view schema but the schema is not validated when executing queries on the view. And the stored schema will not be updated even the underlying table schema changed.
- We also just consider the schema as metadata and don't validate it yet. However, platforms that use our catalog interfaces can do that already.
→ A CatalogTable and CatalogView are metadata objects. Thus, they are unresolved and return an unresolved schema.
→ Only validated and thus resolved CatalogTable's/CatalogView's should be put into a catalog.
We suggest the following interface additions:
dddDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
...