Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink SQL supports a CREATE TABLE AS SELECT  statement (proposed by FLIP-218) that allows users to create tables by deriving their schema from the result of a specified query. However, it does not support defining a schema directly within the statement. The schema for the resulting table is derived solely from the SELECT clause, which limits user control over the resulting table's structure. Some SQL systems, such as Mysql, Postgres and Oracle, allow schema definitions within their CTAS statements.

Introducing schema definition support in the CTAS statement will provide users with greater control and flexibility, making Apache Flink more user-friendly and aligned with widely accepted SQL practices.

Public Interfaces

Changes will be applied to the CREATE TABLE AS syntax, which already supports the same syntax as CREATE TABLE  that allows to provide a schema in the create clause.

However, internal validations reject the schema definition in the CTAS statement. This proposal will remove those validations to allow for new columns added to the CTAS statement.

Proposed Changes

The proposal introduces changes to the CREATE TABLE AS and REPLACE TABLE AS statements to support a schema definition on the CREATE TABLE part. 

This change will allow users to define new columns, primary keys, watermarks, partition keys and table distribution for the table to create.

This FLIP includes covering the existing subtasks left in FLINK-26942:

  • FLINK-28770: Support EXPLAIN in CTAS

  • FLINK-31533: Support partition definition in CTAS

  • FLINK-31534: Support primary key definition in CTAS

The schema definition to support in the CTAS/RTAS syntax includes:

  • Physical, computed, and metadata columns

  • Watermarks and primary keys

  • Partition keys and table distribution

Example of CTAS syntax with properties that will be supported in the statements (same syntax as CREATE TABLE):

CREATE TABLE AS (CTAS)
CREATE TABLE table_name [( { <schema_definition> | <column_list> } )]
[PARTITIONED BY (partition_column [, ...n])]
[DISTRIBUTED BY [HASH|RANGE|RANDOM](distribution_columns) [INTO n BUCKETS]]
[WITH (table_properties)]
AS SELECT query_expression;

<schema_definition>:
  [ <column_definition>[, ...n] ], 
  [ <watermark_definition> ], 
  [ <table_constraint>[, ...n] ]

<column_list>:
  col_name1 [, column_name2, ...]

Example of RTAS syntax with properties that will be supported in the statements (same syntax as REPLACE TABLE):

REPLACE TABLE AS (RTAS)
[CREATE OR] REPLACE TABLE table_name [( { <schema_definition> | <column_list> } )]
[PARTITIONED BY (partition_column [, ...n])]
[DISTRIBUTED BY [HASH|RANGE|RANDOM](distribution_columns) [INTO n BUCKETS]]
WITH (table_properties)
AS SELECT query_expression;

<schema_definition>:
  [ <column_definition>[, ...n] ], 
  [ <watermark_definition> ], 
  [ <table_constraint>[, ...n] ]

<column_list>:
  col_name1 [, column_name2, ...]

Note: The rest of this document mentions only CTAS examples. However, these will apply to RTAS statements as well.

Column definition

The CTAS statement will allow the user to define new columns, including watermarks and primary keys. The columns may include  any supported type, such as physical, metadata, and computed columns.

The behavior of CTAS when defining columns in the CREATE TABLE part follows the semantics of Mysql CREATE TABLE ... SELECT statement, which is as follows:

In a table resulting from CREATE TABLE ... SELECT, columns named only in the CREATE TABLE part come first.
Columns named in both parts or only in the SELECT part come after that. The data type of SELECT columns can
be overridden by also specifying the column in the CREATE TABLE part.

Examples:

  1. Columns named only in the create table part come first.

    > CREATE TABLE t1(a INT, b INT, c INT);
    > CREATE TABLE s1(z INT) AS SELECT * FROM t1;
    > DESCRIBE s1;
    +-------------+-----------+----------+--------+
    | Column Name | Data Type | Nullable | Extras |
    +-------------+-----------+----------+--------+
    | z          | INT        | NULL     |        |
    | a          | INT        | NULL     |        |
    | b          | INT        | NULL     |        |
    | c          | INT        | NULL     |        |
    +-------------+-----------+----------+--------+
  2. Columns named in both parts or only in the SELECT  part come after that. The order of the columns in the SELECT  part are kept the same regardless of the order specified in the create part.

    > CREATE TABLE t1(a INT, b INT, c INT);
    > CREATE TABLE s1(a INT, b INT, c INT, z INT) AS SELECT * FROM t1;
    > DESCRIBE s1;
    +-------------+-----------+----------+--------+
    | Column Name | Data Type | Nullable | Extras |
    +-------------+-----------+----------+--------+
    | z          | INT        | NULL     |        |
    | a          | INT        | NULL     |        |
    | b          | INT        | NULL     |        |
    | c          | INT        | NULL     |        |
    +-------------+-----------+----------+--------+

CTAS may also allow for reordering of columns defined in the SELECT  part by specifying all column names without data types in the CREATE  part. This feature is equivalent to the INSERT INTO  statement.

The columns specified must match the names and number of columns in the SELECT  part. This definition cannot be combined with new columns, which requires defining data types.

Examples:

> CREATE TABLE t1(a INT, b INT, c INT);
> CREATE TABLE t1(c, b, a) AS SELECT * FROM t1;
> DESCRIBE s1;
+-------------+-----------+----------+--------+
| Column Name | Data Type | Nullable | Extras |
+-------------+-----------+----------+--------+
| c           | INT       | NULL     |        |
| b           | INT       | NULL     |        |
| a           | INT       | NULL     |        |
+-------------+-----------+----------+--------+


Overriding resulted schema data types

It is possible to override the data type of a column derived from the query schema by using implicit and/or explicit casting.

Implicit Casting


> CREATE TABLE t1(a INT);
> CREATE TABLE s1(a DOUBLE) AS SELECT a FROM t1;
> DESCRIBE s1;
+-------------+-----------+----------+--------+
| Column Name | Data Type | Nullable | Extras |
+-------------+-----------+----------+--------+
| a           | DOUBLE    | NULL     |        |
+-------------+-----------+----------+--------+

When implicit casting is used, the CTAS operation will follow schema compatibility rules similar to the INSERT/SELECT  statement. This check ensures that the CTAS query can run without conversion issues.

Example of a failing compatibility check:

> CREATE TABLE t1(a STRING);
> CREATE TABLE s1(a INT) AS SELECT a FROM t1;
Column types of query result and sink for 's1' do not match. 
Cause: Incompatible types for sink column 'a' at position 1. 
Query schema: [a: STRING] 
Sink schema: [a: INT]


Explicit Casting


> CREATE TABLE t1(a INT);
> CREATE TABLE s1 AS SELECT CAST(a AS DOUBLE) FROM t1;
> DESCRIBE s1;
+-------------+-----------+----------+--------+
| Column Name | Data Type | Nullable | Extras |
+-------------+-----------+----------+--------+
| a           | DOUBLE    | NULL     |        |
+-------------+-----------+----------+--------+

Note: When explicit casting is used, the CTAS operation does not perform schema compatibility checks. The resulted schema is derived from the query after the explicit cast is done.

Watermarks

Watermarks may also be defined in the CREATE  part and may include columns named in the CREATE  part or the SELECT  part.

  1. Watermark using columns from SELECT part.

    > CREATE TABLE t1(a INT, b TIMESTAMP(3));
    > CREATE TABLE s1 (WATERMARK FOR b AS b - INTERVAL '5' SECOND) 
      AS SELECT * FROM t1;
  2. Watermark using columns from CREATE  part.

    > CREATE TABLE t1(a INT, b INT);
    > CREATE TABLE s1 (
        z TIMESTAMP(3), 
        WATERMARK FOR z AS z - INTERVAL '5' SECOND
      ) 
      AS SELECT * FROM t1;

Primary keys, Partition keys, and Table distribution

The definition of primary keys, partitions keys and table distribution has the same behavior and constraints as in normal create table statements. There are no special considerations for defining them. These clauses must include columns that are either named in the create part or derived from the select part. 

Note: Primary keys cannot be used in nullable columns when creating a table. For CTAS, the same rule is also applied. The column in the create and select parts should be NOT NULL in order to be used a primary key. The current behavior of CTAS inherits the nullability of a column in the select part. If columns are not null, then the resulting schema will contain not null columns thus allowing to define a primary key on them. If all columns are null, then primary key definition is not allowed unless a not null column is defined in the create part.


Primary and partition keys definition:

  1. Primary key using columns from SELECT  part.

    > CREATE TABLE t1(a INT, b INT, c INT);
    > CREATE TABLE s1(PRIMARY KEY (a) NOT ENFORCED) 
        PARTITION BY (a)
        AS SELECT * FROM t1;
  2. Primary key using columns from the CREATE  part.

    > CREATE TABLE t1(a INT, b INT, c INT);
    > CREATE TABLE s1(z INT PRIMARY KEY NOT ENFORCED) 
        PARTITION BY (z)
        AS SELECT * FROM t1;

Table distribution definition:

  1. Table distribution using columns from the SELECT  part.

    > CREATE TABLE t1(a INT, b INT, c INT);
    > CREATE TABLE s2(PRIMARY KEY (a) NOT ENFORCED) 
        DISTRIBUTED BY HASH(a) INTO 3 BUCKETS 
        AS SELECT * FROM t1;
  2. Table distribution using columns from the CREATE  part.

    > CREATE TABLE t1(a INT, b INT, c INT);
    > CREATE TABLE s2(z INT PRIMARY KEY NOT ENFORCED) 
        DISTRIBUTED BY HASH(z) INTO 3 BUCKETS 
        AS SELECT * FROM t1;

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

The plan is to run all existing tests for the existing CTAS design (without a schema definition) to verify nothing is broken.

Also, new tests, that include positive and negative tests, will be automated and verify manually on the CLI for the following cases:

  • Define new columns that extend the schema of the CTAS statement

  • Define columns that exist in the resulted schema and override the column data type

  • Define watermarks that use columns from the resulted schema and new columns added in the CTAS statement

  • Define primary keys that use columns from the resulted schema and new columns added in the CTAS statement

  • Define partition keys and table distribution that use columns from the resulted schema and new columns added in the CTAS statement

Rejected Alternatives

An existing alternative is to use the ALTER TABLE statement to modify the schema of the newly created table. However, the intention of this proposal is to avoid that second statement and extend the CREATE TABLE AS  statement to provide the schema from the beginning.