Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink SQL supports a CREATE TABLE AS SELECT
statement (proposed by FLIP-218) that allows users to create tables by deriving their schema from the result of a specified query. However, it does not support defining a schema directly within the statement. The schema for the resulting table is derived solely from the SELECT
clause, which limits user control over the resulting table's structure. Some SQL systems, such as Mysql, Postgres and Oracle, allow schema definitions within their CTAS statements.
Introducing schema definition support in the CTAS statement will provide users with greater control and flexibility, making Apache Flink more user-friendly and aligned with widely accepted SQL practices.
Public Interfaces
Changes will be applied to the CREATE TABLE AS
syntax, which already supports the same syntax as CREATE TABLE
that allows to provide a schema in the create clause.
However, internal validations reject the schema definition in the CTAS statement. This proposal will remove those validations to allow for new columns added to the CTAS statement.
Proposed Changes
The proposal introduces changes to the CREATE TABLE AS
and REPLACE TABLE AS
statements to support a schema definition on the CREATE TABLE
part.
This change will allow users to define new columns, primary keys, watermarks, partition keys and table distribution for the table to create.
This FLIP includes covering the existing subtasks left in FLINK-26942
:
FLINK-28770: Support EXPLAIN in CTAS
FLINK-31533: Support partition definition in CTAS
FLINK-31534: Support primary key definition in CTAS
The schema definition to support in the CTAS/RTAS syntax includes:
Physical, computed, and metadata columns
Watermarks and primary keys
Partition keys and table distribution
Example of CTAS syntax with properties that will be supported in the statements (same syntax as CREATE TABLE
):
CREATE TABLE table_name [( { <schema_definition> | <column_list> } )] [PARTITIONED BY (partition_column [, ...n])] [DISTRIBUTED BY [HASH|RANGE|RANDOM](distribution_columns) [INTO n BUCKETS]] [WITH (table_properties)] AS SELECT query_expression; <schema_definition>: [ <column_definition>[, ...n] ], [ <watermark_definition> ], [ <table_constraint>[, ...n] ] <column_list>: col_name1 [, column_name2, ...]
Example of RTAS syntax with properties that will be supported in the statements (same syntax as REPLACE TABLE
):
[CREATE OR] REPLACE TABLE table_name [( { <schema_definition> | <column_list> } )] [PARTITIONED BY (partition_column [, ...n])] [DISTRIBUTED BY [HASH|RANGE|RANDOM](distribution_columns) [INTO n BUCKETS]] WITH (table_properties) AS SELECT query_expression; <schema_definition>: [ <column_definition>[, ...n] ], [ <watermark_definition> ], [ <table_constraint>[, ...n] ] <column_list>: col_name1 [, column_name2, ...]
Note: The rest of this document mentions only CTAS examples. However, these will apply to RTAS statements as well.
Column definition
The CTAS statement will allow the user to define new columns, including watermarks and primary keys. The columns may include any supported type, such as physical, metadata, and computed columns.
The behavior of CTAS when defining columns in the CREATE TABLE
part follows the semantics of Mysql CREATE TABLE ... SELECT statement, which is as follows:
In a table resulting from CREATE TABLE ... SELECT, columns named only in the CREATE TABLE part come first.
Columns named in both parts or only in the SELECT part come after that. The data type of SELECT columns can
be overridden by also specifying the column in the CREATE TABLE part.
Examples:
Columns named only in the create table part come first.
> CREATE TABLE t1(a INT, b INT, c INT); > CREATE TABLE s1(z INT) AS SELECT * FROM t1; > DESCRIBE s1; +-------------+-----------+----------+--------+ | Column Name | Data Type | Nullable | Extras | +-------------+-----------+----------+--------+ | z | INT | NULL | | | a | INT | NULL | | | b | INT | NULL | | | c | INT | NULL | | +-------------+-----------+----------+--------+
Columns named in both parts or only in the
SELECT
part come after that. The order of the columns in theSELECT
part are kept the same regardless of the order specified in the create part.> CREATE TABLE t1(a INT, b INT, c INT); > CREATE TABLE s1(a INT, b INT, c INT, z INT) AS SELECT * FROM t1; > DESCRIBE s1; +-------------+-----------+----------+--------+ | Column Name | Data Type | Nullable | Extras | +-------------+-----------+----------+--------+ | z | INT | NULL | | | a | INT | NULL | | | b | INT | NULL | | | c | INT | NULL | | +-------------+-----------+----------+--------+
CTAS may also allow for reordering of columns defined in the SELECT
part by specifying all column names without data types in the CREATE
part. This feature is equivalent to the INSERT INTO
statement.
The columns specified must match the names and number of columns in the SELECT
part. This definition cannot be combined with new columns, which requires defining data types.
Examples:
> CREATE TABLE t1(a INT, b INT, c INT); > CREATE TABLE t1(c, b, a) AS SELECT * FROM t1; > DESCRIBE s1; +-------------+-----------+----------+--------+ | Column Name | Data Type | Nullable | Extras | +-------------+-----------+----------+--------+ | c | INT | NULL | | | b | INT | NULL | | | a | INT | NULL | | +-------------+-----------+----------+--------+
Overriding resulted schema data types
It is possible to override the data type of a column derived from the query schema by using implicit and/or explicit casting.
Implicit Casting
> CREATE TABLE t1(a INT); > CREATE TABLE s1(a DOUBLE) AS SELECT a FROM t1; > DESCRIBE s1; +-------------+-----------+----------+--------+ | Column Name | Data Type | Nullable | Extras | +-------------+-----------+----------+--------+ | a | DOUBLE | NULL | | +-------------+-----------+----------+--------+
When implicit casting is used, the CTAS operation will follow schema compatibility rules similar to the INSERT/SELECT
statement. This check ensures that the CTAS query can run without conversion issues.
Example of a failing compatibility check:
> CREATE TABLE t1(a STRING); > CREATE TABLE s1(a INT) AS SELECT a FROM t1; Column types of query result and sink for 's1' do not match. Cause: Incompatible types for sink column 'a' at position 1. Query schema: [a: STRING] Sink schema: [a: INT]
Explicit Casting
> CREATE TABLE t1(a INT); > CREATE TABLE s1 AS SELECT CAST(a AS DOUBLE) FROM t1; > DESCRIBE s1; +-------------+-----------+----------+--------+ | Column Name | Data Type | Nullable | Extras | +-------------+-----------+----------+--------+ | a | DOUBLE | NULL | | +-------------+-----------+----------+--------+
Note: When explicit casting is used, the CTAS operation does not perform schema compatibility checks. The resulted schema is derived from the query after the explicit cast is done.
Watermarks
Watermarks may also be defined in the CREATE
part and may include columns named in the CREATE
part or the SELECT
part.
Watermark using columns from
SELECT
part.> CREATE TABLE t1(a INT, b TIMESTAMP(3)); > CREATE TABLE s1 (WATERMARK FOR b AS b - INTERVAL '5' SECOND) AS SELECT * FROM t1;
Watermark using columns from
CREATE
part.> CREATE TABLE t1(a INT, b INT); > CREATE TABLE s1 ( z TIMESTAMP(3), WATERMARK FOR z AS z - INTERVAL '5' SECOND ) AS SELECT * FROM t1;
Primary keys, Partition keys, and Table distribution
The definition of primary keys, partitions keys and table distribution has the same behavior and constraints as in normal create table statements. There are no special considerations for defining them. These clauses must include columns that are either named in the create part or derived from the select part.
Note: Primary keys cannot be used in nullable columns when creating a table. For CTAS, the same rule is also applied. The column in the create and select parts should be NOT NULL in order to be used a primary key. The current behavior of CTAS inherits the nullability of a column in the select part. If columns are not null, then the resulting schema will contain not null columns thus allowing to define a primary key on them. If all columns are null, then primary key definition is not allowed unless a not null column is defined in the create part.
Primary and partition keys definition:
Primary key using columns from
SELECT
part.> CREATE TABLE t1(a INT, b INT, c INT); > CREATE TABLE s1(PRIMARY KEY (a) NOT ENFORCED) PARTITION BY (a) AS SELECT * FROM t1;
Primary key using columns from the
CREATE
part.> CREATE TABLE t1(a INT, b INT, c INT); > CREATE TABLE s1(z INT PRIMARY KEY NOT ENFORCED) PARTITION BY (z) AS SELECT * FROM t1;
Table distribution definition:
Table distribution using columns from the
SELECT
part.> CREATE TABLE t1(a INT, b INT, c INT); > CREATE TABLE s2(PRIMARY KEY (a) NOT ENFORCED) DISTRIBUTED BY HASH(a) INTO 3 BUCKETS AS SELECT * FROM t1;
Table distribution using columns from the
CREATE
part.> CREATE TABLE t1(a INT, b INT, c INT); > CREATE TABLE s2(z INT PRIMARY KEY NOT ENFORCED) DISTRIBUTED BY HASH(z) INTO 3 BUCKETS AS SELECT * FROM t1;
Compatibility, Deprecation, and Migration Plan
It is a new feature with no implication for backwards compatibility.
Test Plan
The plan is to run all existing tests for the existing CTAS design (without a schema definition) to verify nothing is broken.
Also, new tests, that include positive and negative tests, will be automated and verify manually on the CLI for the following cases:
Define new columns that extend the schema of the CTAS statement
Define columns that exist in the resulted schema and override the column data type
Define watermarks that use columns from the resulted schema and new columns added in the CTAS statement
Define primary keys that use columns from the resulted schema and new columns added in the CTAS statement
Define partition keys and table distribution that use columns from the resulted schema and new columns added in the CTAS statement
Rejected Alternatives
An existing alternative is to use the ALTER TABLE
statement to modify the schema of the newly created table. However, the intention of this proposal is to avoid that second statement and extend the CREATE TABLE AS
statement to provide the schema from the beginning.