Page tree
Skip to end of metadata
Go to start of metadata


Current state: Implemented

Discussion thread

JIRA: FLINK-14485 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


As part of FLIP-30 a Catalog API was introduced that enables storing table meta objects permanently. At the same time the majority of current APIs create temporary objects that cannot be serialized. This FLIP aims to clarify the creation of meta objects (tables, views, functions) in a unified way.

Another current problem in the API is that all the temporary objects are stored in a special built-in catalog, which is not very intuitive for many users, as they must be aware of that catalog to reference temporary objects.

Lastly, different APIs have different ways of providing object paths. Either via 

  • String path…, 
  • String path, String pathContinued…
  • String name

We should choose one approach and unify it across all APIs.

Public Interfaces

registerTable & registerDataStream

The naming of Table objects is actually quite misleading. The Table object represents a relational query, which is actually a view rather than a Table. The difference between a view and a table is primarily that a Table is a physical storage of data. Whereas view is a virtual table on top of Tables that does not materialize data. Thus the flink org.apache.flink.table.api.Table object is actually a SQL View. The same applies to a DataStream, which is also a way to extract data from persistent storage and apply transformations on top of it.

Moreover for DataStream we should only support temporary Views, as there is no way (as of now) to persist them.

We use the “create” prefix rather than “register” to be closer to SQL DDL.

Suggested methods:

void createTemporaryView(String path, Table view);
void createTemporaryView(String path, DataStream view);

registerTableSink & registerTableSource

We suggest to drop those methods entirely as they are misleading what they actually do. TableSource & Sinks long-term are meant to support the physical representation of the data without the logical part as e.g. computed columns (watermarks etc.). Those will be part of the CatalogTable abstraction.

In other words the TableSource & TableSink are too physical to be exposed for the inline declaration. They are meant for predefined connectors. If a user wants to read from some inline source, it can be done with from/toDataStream.

This should be replaced with the properties approach (DDL, descriptor).

registerScalarFunction, registerAggregateFunction & registerTableFunction

As a result of the rework of the type system in UDFs, we will be able to merge the three methods into a single one.  Moreover for permanent functions we need users to register a class instead of an instance. To keep this method in sync with SQL DDL we should encourage users to use a class name for temporary functions as well.

We use the “create” prefix rather than “register” to be closer to SQL DDL.

Suggested methods:

void createTemporaryFunction(String path, Class<? extends UserDefinedFunction> functionClass);

There were concerns raised if we need a variant that registers instances. This FLIP does not make any assumptions if we should or not introduce that method. We will revisit that after FLIP-65

The method to be discussed:

void createTemporaryFunction(String path, UserDefinedFunction function);


The table descriptor describes properties of an external system, the physical data format and logical type of the data. Therefore it represents a Table concept. For queries that do not want to modify metastore permanently it makes sense to introduce a temporary table concept.

IMPORTANT: Flink does not own data, just the meta information. Temporary table means that the meta information is available for the session only. If some data was written to a temporary table, the data will not be dropped at the end of the session.

We use the “create” prefix rather than “register” to be closer to SQL DDL.

Suggested methods:

class ConnectTableDescriptor {
void createTemporaryTable(String path);

The method would store the properties only. The source & sink lookup would happen later when reading or writing from/to table. Therefore we do not need to separate source & sink registration.

NOTE: We should not support CREATE TEMPORARY TABLE … AS SELECT syntax. As mentioned above flink does not own the data. Therefore this statement should not be supported in Flink. In Flink’s statement, such a query can be expressed with CREATE TEMPORARY VIEW.

Dropping temporary objects

The temporary objects can shadow permanent objects. Therefore it is vital to enable dropping them to switch from temporary (usually used for experimentation) to permanent objects. We suggest to introduce a separate methods for temporary objects to make the distinction really clear which objects are dropped. The dropTemporary* methods would remove only the temporary objects. They would not take permanent objects into consideration. The same should apply for the regular drop methods. They should only apply to permanent tables, but should throw an exception if a temporary object with same identifier exists. The methods would return true if an object existed under given path and was removed.

Suggested methods:

boolean dropTemporaryView(String path);
boolean dropTemporaryTable(String path);
boolean dropTemporaryFunction(String path);

Listing temporary objects

As discussed in FLIP-57 we need method for listing temporary functions to be able to list temporary system function. To have a consistent behavior for all temporary objects. I suggest introducing similar methods for all other temporary objects:

String[] listTemporaryTables();

String[] listTemporaryViews();

String[] listTemporaryFunctions();

The current methods such as listTables/listFunctions would not list any of the temporary objects, but only the persistent objects.


Methods of TableEnvironment

Current call





For the non temporary part we need to make `QueryOperation` string serializable.


(Deprecate) → to be removed


(Deprecate) → to be removed







We can unify the 3 methods into one once we rework type inference for UDFs

New suggested methods:

Current call






Methods of ConnectTableDescriptor

Current call


SQL equivalent



(Deprecate) → to be removed



(Deprecate) → to be removed







Persistent API - not part of the FLIP

Implementation, nor design of those API calls is not part of the FLIP. It is just to show that the permanent API is a separate concept that requires further work.

Call to add

SQL equivalent




We need to make `QueryOperation`s string serializable.



We need to rework TableDescriptor. We can temporarily use ConnectTableDescriptor#connect



We need serializable function representation

Referencing objects

I suggest to change the way we address objects in the API to unify it across SQL/Table API & different objects. We should always specify path as a single string and parse it into a catalog/database/object-name subparts.

Affected APIs

We should deprecate:

  • TableEnvironment
    • Table scan(String... tablePath);
    • void insertInto(Table table, String sinkPath, String... sinkPathContinued);
  • Table
    • void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued);
    • void insertInto(String tablePath, String... tablePathContinued);

We suggest to replace those calls with

  • TableEnvironment
    • Table from(String tablePath)
    • void insertInto(String sinkPath, Table table);
  • Table
    • we need to immediately drop the “void insertInto(String tablePath, String... tablePathContinued);” for this to work. Otherwise this call would be ambiguous:

      Table t = …

    • void insertInto(String sinkPath)

Parsing logic

Parsing logic should follow the SQL standard logic for identifiers

  • Identifier should be 1-3 part identifier
  • Parts should be delimited with a . dot
  • Users can escape parts of identifier with ` backtick
  • Users can escape backtick by duplicating it

Proposed Changes   


All objects are identified with 3 part identifiers (catalog name, database name, object name).

We suggest to enable overriding catalog objects with temporary objects. This means it would be possible to register temporary table with identifier `cat1`.`db1`.`tmpTab`. Even if `tmpTab` exists in the `cat1` catalog in `db1` database. Moreover we suggest to allow registering temporary objects in a path that does not exist. This means a user can register e.g. a table in a catalog or database that does not exist.

The benefit of this approach compared to having a specialized temp db (as in SQL Server, ORACLE, Postges) is that it makes experiments way easier. For exploratory purposes a user can use a temporary table. Once the results are verified. The user can drop the temporary table and rerun the exact same query on top of the permanent table.

The tables & views are always identified with a 3 part path.

The user provided path is always (both for registering & looking up an object) first expanded to a full 3-part path.

createTemporaryView("cat.db.temp", ...) → registers function with an identifier `cat`.`db`.`temp`
createTemporaryView("db.temp", ...) → registers function with an identifier `current_cat`.`db`.`temp`
createTemporaryView("temp", ...) → registers function with an identifier `current_cat`.`current_db`.`temp`

The same logic applies for looking up objects:
tEnv.from("cat.db.temp") → scans a view/table with an identifier `cat`.`db`.`temp`
tEnv.from("db.temp") → scans a view/table with an identifier `current_cat`.`db`.`temp`
tEnv.from("temp") → scans a view/table with an identifier `current_cat`.`current_db`.`temp`

The resolution order between temporary & persistent objects is as follows:

  1. Temporary tables/views
  2. Persistent Catalog tables/views

Temporary functions identifiers resolution:

Temporary function identifiers were discussed as part of FLIP-57. To summarize the outcome of the discussion, temporary functions can shadow both system and catalog functions. This implies following resolution order for the read path:

  1. Temporary system functions
  2. System functions
  3. Temporary catalog functions, in the current catalog and current database of the session
  4. Catalog functions, in the current catalog and current database of the session

For the write path, if a user registers a temporary function with just a name it will be registered as a temporary system function:

createTemporaryFunction("temp", Function.class) → registers function with an identifier `temp`

If a user uses either 3-part path or 2-part path it is registered as a catalog temporary function, and possibly expanded with the current catalog:

createTemporaryFunction("cat.db.temp", Function.class) → registers function with an identifier `cat`.`db`.`temp`
createTemporaryFunction("db.temp", Function.class) → registers function with an identifier `current_cat`.`db`.`temp`

Temporary objects should be stored in memory in a CatalogManager/FunctionCatalog

Rejected alternatives:

  1. 1-part path
    1. no other system has such semantics, all systems assign temporary tables & views to some schema (either with the same rules as regular objects or special temporary schema)
  2. Require special names for temporary objects, e.g. (#name as in SQL Server, or PTT_nam as in ORACLE)
  3. Register temporary objects in a special DB (as in SQL Server, Oracle, Postgres)
  4. Always assign temporary functions to some namespace (see FLIP-57).

Compatibility, Deprecation, and Migration Plan

  • Methods of TableEnvironment to be deprecated:
    • void registerTable(String name, Table table)
    • void registerTableSource(String name, TableSource<?> tableSource);
    • void registerTableSink(String name, TableSink<?> configuredSink);
    • Table scan(String... tablePath)
    • void registerFunction(String name, ScalarFunction function)
    • <T> void registerFunction(String name, TableFunction<T> tableFunction)
    • <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction)
    • <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction)
    • <T> void registerDataStream(String name, DataStream<T> dataStream)
    • <T> void registerDataStream(String name, DataStream<T> dataStream, String fields)
    • <T> void registerDataSet(String name, DataStream<T> dataStream)
    • <T> void registerDataSet(String name, DataStream<T> dataStream, String fields)
  • Methods of ConnectTableDescriptor to be deprecated:
    • public void registerTableSource(String name) 
    • public void registerTableSink(String name) 
    • public void registerTableSourceAndSink(String name)
  • Methods of TableEnvironment to be dropped:
    • void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued)
    • void insertInto(Table table, BatchQueryConfig queryConfig, String sinkPath, String... sinkPathContinued)
    • void insertInto(Table table, String sinkPath, String... sinkPathContinued)
  • Methods of Table to be dropped
    • void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued)
    • void insertInto(String tablePath, String... tablePathContinued)

Implementation plan

The implementation of changes described for functions of this FLIP has to be postponed after type inference is exposed for UserDefinedFunctions.


How other systems handle temporary objects:


MySQL allows creating temporary tables in any schema (even if the target database does not exist). Those temporary tables take precedence over permanent tables with the same name (and schema). Therefore user cannot access a permanent table unless the temporary table is dropped.

MySQL adds also a special syntax for dropping temporary tables (DROP TEMPORARY TABLE) in order to prohibit dropping permanent tables when the intention was to drop a temporary one.


Hive implements similar behavior to MySQL. The difference is that the database must exist. Hive also does not add the DROP TEMPORARY TABLE syntax.

In hive temporary functions must not have a database.

SQL Server:

SQL server reserves a special schema for temporary tables (dbo). It also forces user to prefix table names with ‘#’ character. This needed to differentiate if the table should be temporary or permanent. Therefore, it is not possible to override a permanent table.

If you user includes a schema_name when he/she creates or accesses a temporary table, it is ignored. All temporary tables are created in the dbo schema.


Oracle implements similar behavior to SQL Server.


Postgres allows overriding permanent tables with temporary objects, but does not allow arbitrary schemas. Objects identifiers consist of only a single object name.

  • No labels