Status

Discussion threadhttps://lists.apache.org/thread/sls26s8y55tfh59j2dqkgczml6km49jx
Vote thread
JIRA

FLINK-27365 - Getting issue details... STATUS

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLINK-21634 - Getting issue details... STATUS , We plan to support many schema changes in Flink SQL. But for the current Table Store, it may result in wrong data, unclear evolutions.

In general, the user has these operations for schema:

  • Add column: Adding a column to a table.
  • Modify column type.
  • Drop column: Drop a column.
  • Rename column: For example, rename the "name_1" column to "name_2".

Another schema change is partition keys, the data is changing over time, for example, a table with day partition, as the business continues to grow, the new partition of the table by day will become larger and the business wants to change to hourly partitions.


A simple approach is to rewrite all the existing data when modifying the schema.

But this expensive way is not acceptable to the user, so we need to support and define clearly.

Modify the schema does not rewrite the existing data, when reading the original data need to evolve to the current schema.

(The underlying storage does not perceive Computed Column and Watermark, so their changes have been already supported)

User Behavior

Table store as a storage is required to support the ability of schema evolution, It does not rewrite the existing data, and the semantics are:

  • Add column, the existing data must return null
  • Modify column type: the existing data need to evolve to new type from old type
  • Drop column:  this column in the existing data must be ignore,  does not affect other fields
  • Rename column: Read the existing data by old name, read new data by new name, does not affect other fields

A simple example, a table with fields T(a STRING, b STRING, c STRING):

  1. INSERT INTO T VALUES ('a1', 'b1', 'c1');
  2. ALTER TABLE T DROP COLUMN c;
  3. ALTER TABLE T ADD COLUMN c STRING;
  4. INSERT INTO T VALUES ('a2', 'b2', 'c2');
  5. SELECT * FROM T: ('a1', 'b1', NULL), ('a2', 'b2', 'c2')

Users can also make the following changes to the partition fields:

  • Add partition column
  • Drop partition column

Partition modifications do not affect data correctness.

Limitation:

  • Primary keys can not be changed.
  • Modify/Drop/Rename on Primary keys is not supported.
  • Modify/Drop/Rename on Partition keys is not supported. (Partition fields contained in a valid schema)
  • ADD COLUMN not support add a NOT-NULL column

Proposed Design

How to support above behaviors?

  1. The fields in the schema must carry id information
    1. Each new column added will be given a new id
    2. Modify a column, its id remains the same
    3. so that the same fields with different names can be identified
    4. so that two fields with the same name can be distinguished.
  2. The storage must have the ability to hold multiple versions of the schema.
  3. Each file will correspond to a schema id

So when a schema change occurs and the existing data is read: Get the schema of the existing data, compare the read schema, and evolve the existing data to the new schema.

DataType

The fields in the schema must carry id information. But our current LogicalType  does not hold such id information, and Flink itself does not need such information.

We will introduce a DataType in Table Store, just like DataType in Flink stores more conversionClass than LogicalType, DataType in Table Store stores more fieldId information than LogicalType.

public final class DataField {

    // id for schema evolution
    private final int id;

    private final String name;

    private final DataType type;

    private final @Nullable String description;

    .....
}

Schema

The storage must have the ability to hold multiple versions of the schema.

Each version of the schema is stored in the FileStore:

table_path/

  • manifest/
    • ...
  • snapshot/
    • ...
  • schema/
    • schema-1
    • schema-2
    • ...

Each schema version has an id that corresponds to a specific schema when the data file is created. 

Schema class:

/** Schema of table store. */
public class Schema {

    private final long id;

    private final List<DataField> fields;

    /** Not available from fields, as some fields may have been deleted. */
    private final int highestFieldId;

    private final List<String> partitionKeys;

    private final List<String> primaryKeys;

    private final Map<String, String> options;
}

Manifest

There are two adjustments required on Manifest.

  • ManifestEntry (DataFileMeta) and need to record the schemaId it belongs to. 
  • Schemaless Serialization for Manifest: The current Manifest stores the statistics of the data, which are schema-related, and if a schema change occurs, it will lead to difficulties in merging manifests. We can make the serialization of Manifest and data schema independent, use binary form to save statistics, the advantage of this is that after deserialization Manifest get the file information can know the file schemaId, and then carry out the deserialization of statistics.

Schema Evolution

Based on the above capabilities, we detect different schema when filtering files and reading files to do the file schema evolution.

For type evolution, we currently only support types that are supported by implicit conversions. (From Flink LogicalTypeCasts)

Three modes can be supported in future to allow the user to select:

  • Default implicit conversions.
  • Allow implicit and explicit conversions. 
    • Throw exceptions when cast fail.
    • Return null when cast fail.

Compatibility

We hope to solidify the stored metadata with this change, which may cause incompatibility with the 0.1 preview version.

Rejected Alternatives

Add id to LogicalType

We can also choose to change the LogicalType in Flink, but this may pollute Flink's design, including its asSerializableString approach, causing unnecessary incompatibilities.