Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/39mwckdsdgck48tzsdfm66hhnxorjtz3
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
In FLIP-218 & FLIP-305, Flink supports
CREATE TABLE AS SELECT statement which allows users to create new tables based on existing tables or query results. It's convenient for data analysts and data scientists to manage their data. However, Flink does not currently support the
REPLACE TABLE AS SELECT statement which enables users to replace an existing table with new data. With
REPLACE TABLE AS SELECT, they won't need to drop the table first and use
CREATE TABLE AS SELECT then. Only one single
REPLACE TABLE AS SELECT statement can meet their needs.
So, this FLIP is aimed to support
REPLACE TABLE AS SELECT statement in Flink.
We propose adding the following syntax for
REPLACE TABLE AS SELECT statement:
Also, we would like to propose to
CREATE OR REPLACE TABLE AS to wrap
CREATE TABLE AS SELECT and
REPLACE TABLE AS SELECT which will create a table if the table to be replaced doesn't exist.
Public Interfaces Change
To support atomic, we propose to add the following part to the interface
SupportsStaging proposed in FLIP-305
Also, we propose to modify the name of the option "table.cats.atomicity-enabled" proposed in FLIP-305:
REPLACE TABLE AS statement:
1: Construct the table to be created
2: Check the table exists or not. If the table doesn't exist, throw
TableException(String.format("The table %s to be replaced doesn't exist. You may want to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement.",tableIdentifier)).
3:Check the atomicity is enabled, it requires both the option table.rtas-ctas.atomicity-enabled is set to true and the corresponding table sink implements
a: if atomic is enabled, it expects the atomicity to be guaranteed by external connector implementation. The Flink will generate an insert job according to the table subquery in
REPLACE TABLE AS statment, and call method StagedTable
#begain before the insert job start, call method StagedTable
#commit after the job finish, call method StagedTable
#abort if the job fail or canceled.
b: if not, then the atomicity can not be guaranted. Flink will do the operations for Replace Table one by one without atomicity guarantee. More exactly, it will drop the old table, create the new table, insert data into the new tables.
CREATE OR REPLACE TABLE AS stament, when the table exists, it'll consider it as
REPLACE TABLE AS statement. Otherwise, it'll consider it as
CREATE TABLE AS statement.
Note: Again, the propose changes much depend on FLIP-305. For more detail, please see the proposed chagne part in FLIP-305.
Compatibility, Deprecation, and Migration Plan
No any compatibility problem.
UT & IT
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.