Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the Table & SQL API is implemented in Scala. This decision was made a long-time ago when the initial code base was created as part of a master's thesis. The community kept Scala because of the nice language features that enable a fluent Table API like table.select('field.trim()) and because Scala allows for quick prototyping (e.g. multi-line comments for code generation). The committers enforced not splitting the code-base into two programming languages. Additionally, the API is defined in a single module and tightly coupled to DataSet and DataStream API. The flink-table module has been treated as a library on top of Flink’s main APIs so far.

Nowadays the flink-table module more and more becomes an important and independent part in the Flink ecosystem. Connectors, formats, and SQL client are actually implemented in Java but need to interoperate with flink-table which makes these modules dependent on Scala. As mentioned in an earlier mail thread, using Scala for API classes also exposes member variables and methods in Java that should not be exposed to users. Java is still the most important API language and right now we treat it as a second-class citizen.

Furthermore, many users are using (or are supposed to use) only Table & SQL API with table connectors and catalogs. So a tightly coupling with DataSet and DataStream API is not necessary anymore.

Since bigger code contributions to flink-table are in the pipeline, we need to ensure a smooth transition/replacement of the underlying core at any time without affecting current users. The current module and class structure is not flexible enough without breaking backwards compatibility.

FYI: This FLIP replaces FLIP-28 as it not only reworks the module structure but also discusses future visions of the entire table ecosystem.

Goals

The summarized goals of this document are:

  • Make flink-table Scala-free (long-term goal!).
    We focus on porting the API to Java and keep this goal in mind for future contributions. Scala code can still be merged. But new components can be implemented with a potentially larger/more skilled contributor base and we avoid introducing more technical debt.

  • Remove annoying API design fails
    Currently, the flink-table module contains 7 different TableEnvironments (3 base classes + 4 batch/stream environments for Java and Scala). Let’s make this simpler. We rely on TypeInformation instead of SQL types with proper nullability, decimal precision/scale, and CHAR support. Let’s make this consistent across SQL and Table API.

  • Decouple table programs from DataStream/DataSet API
    Allow table programs to be self-contained. No need for a Stream/ExecutionEnvironment entrypoint anymore. A table program definition is just API that reads and writes to catalog tables.

  • Decouple API from optimizer and runtime
    An underlying planner should be exchangeable at anytime. For future runtime changes and big code contributions such as Blink SQL.

  • Unify batch/streaming
    For pure table programs that read and write to catalog tables, a user should be able to use a unified API with a unified TableEnvironment instance with unified sources and sinks. A dedicated batch table environment will not be necessary anymore.

  • Remain backwards compatible across multiple releases
    In general, we should aim for remaining backwards compatible. However, if there is API that limits us in achieving the goals above. We should deprecate this part of the API, provide an alternative, and remove it one or two releases later. Ideally, we identify these parts for 1.8 such that we can remove them already in Flink 1.9. The flink-table module has no @Public(Evolving) annotations so we can perform changes quickly but selectively. For example, we need to remove the static methods in TableEnvironment. Flink’s BatchTableEnvironment must not be compatible with Blink’s runtime for now.

  • Merge Blink SQL’s features and architectural improvements
    Merge the Blink SQL planner given that necessary Flink core/runtime changes have been completed. The merging will happen in stages (e.g. basic planner framework, then operator by operator). The exact merging plan still needs to be determined.

Disclaimer

This document covers only a general vision. There are multiple subproblems for which we need to find a solution step by step. There might be temporary/intermediate steps to be taken in order to keep reworking efforts low or remain backwards compatible. For example, we can add temporary dependencies to other modules in the dependency structure that is presented below. Of course we should avoid that if possible. But this document does not cover every little subproblem.

Task Dependencies

In order to parallelize the work and unblock people in contributing new features, the following diagram illustrates the tasks and their dependencies. A more detailed implementation plan is attached to this document.


Blue: Table API & SQL tasks

Red: Flink Runtime/Core tasks

New Module Structure

As discussed in FLIP-28, a first step to shape the future of the Table & SQL API is to get the module and dependency structure right. We suggest the following structure. Every module is explained in a separate section.

??? = we don’t know yet
* = all modules

flink-table
  flink-table-common
    (dependencies: flink-core)

  flink-table-api-base
    (dependencies: flink-table-common)

  flink-table-api-java
    (dependencies: flink-table-api-base)

  flink-table-api-scala
    (dependencies: flink-table-api-base)

  flink-table-api-java-bridge
    (dependencies: flink-table-api-java, flink-streaming-java)

  flink-table-api-scala-bridge
    (dependencies: flink-table-api-scala, flink-streaming-scala)

  flink-table-planner
    (dependencies: flink-table-api-java-bridge,
                   flink-streaming-scala,
                   Apache Calcite)

  flink-table-planner-blink
    (dependencies: flink-table-runtime-blink, flink-???)

  flink-table-runtime-blink
    (dependencies: flink-???)

  flink-table-dist
    (dependencies: flink-table-*, not flink-table-*-blink?)

At a first glance the structure might appear bloated. But depending on the use case only little dependencies need to be added as shown in the next examples.

A planner module always contains the runtime as well. So a users just needs to add the right planner and API. Additionally, the Blink code is “logically” split into two modules. Thus, in the future it is possible to submit more lightweight table programs to the cluster by just submitting the JobGraph and flink-table-runtime-blink.

Examples:

User wants to write a plugin with external catalog and bunch of sources and sinks.

→ use flink-table-common (vision)
→ use flink-table-api-java-bridge (until TableSources/Sink interfaces have been reworked)

User wants to write a table program in the table ecosystem using Java.

→ use flink-table-api-java

User wants to write a table program in the table ecosystem using Java + execute it in the IDE.

→ use flink-table-api-java + flink-table-planner

User wants to write a table program + translate to Java DataStream API + execute it in the IDE.

→ use flink-table-api-java-bridge + flink-table-planner

Modules

Moved out of flink-libraries as a top-level parent module.

Contains interfaces and common classes that need to be shared across different Flink modules. This module was introduced in Flink 1.7 and its name integrates nicely with the existing Flink naming scheme. A name containing `spi` would also be confusing for users looking for `api`.

There was a discussion about merging flink-table-common with flink-table-api-base. However, it is good software engineering practice to separate API and SPI. It also reduces pollution of other non-table modules with Table API classes.

Connectors, formats, catalogs, and UDFs can use this without depending on the entire Table API stack or Scala. The module contains interface classes such as descriptors, table sink, table source. It will also contain the table factory discovery service such that connectors can discover formats.

The module should only contain Java classes and should have no external dependencies to other modules. A dependency on flink-core should be sufficient. In theory, once we reworked the type system, a dependency to flink-core is not necessary anymore. However, we will need some connection to the core in order to define sources/sinks. Currently, this is DataStream/DataSet API but might be a new source interface in flink-core in the future.

Currently, we cannot add interfaces for connectors into the module as classes such as `StreamTableSource` or `BatchTableSource` require a dependency to `DataStream` or `DataSet` API classes. This might change in the future once we reworked our TableSource and TableSink interfaces. For now, extension points for connectors are located in `flink-table-api-*-bridge` and tightly integrate with the target API.

In the future, we might need to add some basic expression representations (for <, >, ==, !=, field references, literals, symbols, generic call) in order to push down filter predicates into sources without adding a dependency on `flink-table-api-base` or Calcite.

Contains API classes such as expressions, TableConfig and base interfaces and classes for Table and TableEnvironment. It contains most classes from `org.apache.flink.table.api.*` plus some additional classes. It contains subclasses of `org.apache.flink.table.plan.logical.LogicalNode`.

Users will not depend directly on this module. This module will be used by language-specific modules. Users will depend on "flink-table-api-java" or "flink-table-api-scala" when writing a table program.

The module should only contain Java classes.

The module will not contain a Calcite dependency. This means that LogicalNode and Expression classes won’t have RelNode and RexNode anymore. LogicalNode and Expression become the output of he Table API and need to be converted into Calcite optimizer RelNode and RexNodes nodes by the planner.

This module defines a Planner interface for passing the API plan, optimization, and execution. The base table environment performs a Java Service Provider based discovery for finding a planner matching the requirements expressed through the API. This mechanism is explained in the next part of this document.

Regarding the large expression code base: this structure means that all API expression representations need to be reworked. This is a lot of work and will not happen quickly. As a temporary solution there are different alternatives:

  1. We would let the expression case classes be located in the planner module for now. Since users added a planner module dependency the translation still works.

  2. We translate to just a generic `Call(“functionname”, expr*)` expression and the resolution is done by the planner.

Contains API classes with interfaces targeted to Java users. All classes here will be prefixed with `org.apache.flink.table.api.java`.

A prerequisite for a unified table environment in this module is that the Table API type system has been reworked. Otherwise we need Java type extraction and return type information.

TODO If Java and Scala would share the same type extraction logic, we could simplify the architecture a lot. We would only need one TableEnvironment anymore, because the biggest difference between Scala and Java table environments is the type extraction. In the past this was already very confusing for people. For example, sometimes case classes go through the Java type extractor and sometimes they go through the Scala type analyzer. If we unify the type system, we could unify api-java and api-base into a single module. If we don’t rework the type system, the api-scala module needs a dependency on flink-scala only for the Scala type analyzer macro.

Contains API classes with interfaces targeted to Scala users such as Scala implicit expression conversions (expressionDsl.scala). All classes here will be prefixed with `org.apache.flink.table.api.scala`.

The module should only contain Scala classes.

There were opinions about letting `flink-table-api-scala` depend on `flink-table-api-java` and removing the base module. However, the problem with this approach is that classes such as `BatchTableEnvironment` or `Tumble` would be twice in the classpath. In the past, this led to confusion because people were not always paying attention to their imports and were mixing Java API with Scala API. The current module structure avoids ambiguity.

A prerequisite for a unified table environment in this module is that the Table API type system has been reworked. Otherwise we need Scala macros for type information extraction. These macros are located in `flink-scala` right now and would pull in DataSet dependencies.

Bridges the table ecosystem with the Java DataSet and Java DataStream API. It provides classes for non-unified table environments `org.apache.flink.table.api.java.BatchTableEnvironment` and `org.apache.flink.table.api.java.StreamTableEnvironment` that can convert back and forth between the target API. This means that toAppendStream, fromAppendStream etc. are only available in those bridging table environments. Bridging functionality includes integration with TypeInformation.

Until table sources and sink interfaces have been reworked, this module will also contain StreamTableSource, BatchTableSource, etc. because those interfaces are tightly coupled with DataStream/DataSet API right now.

Bridges the table ecosystem with the Scala DataSet and Scala DataStream API. It provides classes for non-unified table environments `org.apache.flink.table.api.scala.BatchTableEnvironment` and `org.apache.flink.table.api.scala.StreamTableEnvironment` that can convert back and forth between the target API. This means that toAppendStream, fromAppendStream etc. are only available in those bridging table environments. Bridging functionality includes integration with TypeInformation.

Contains the main logic for converting a logical representation into DataStream/DataSet program from the old Flink flink-table logic. The planner module bridges `api` module and runtime code similar to how it is done in the DataSet API of Flink. A user has to add `flink-table-api-scala/java*` and a planner in order to execute a program in an IDE.
This module contains the original `flink-table` module classes until it is replaced with better code :-) API classes will gradually be converted into Java and distributed to their future location in `flink-table-api-*`.

Compared to the future planner this planner contains the runtime code already. The reason for this is because we are currently using Calcite functions during runtime e.g. for LIKE or timestamp conversion. For the future we should aim to separate runtime and planning phase clearly as explained below.

Contains the work-in-progress Blink planner code. It contains the main logic for converting a logical representation into StreamTransformation program. Once this module is stable, it will be renamed to flink-table-planner such that users don’t need to update their dependencies. This module will be the only module with a Apache Calcite dependency.

Contains the work-in-progress Blink runtime code. It contains the main logic for executing a table program. It aims to make JAR files that need to be submitted to the cluster small. It has no dependencies to Calcite.

This module just bundles the table ecosystem into one JAR file that can be moved into the /lib directory from /opt in a Flink distribution.

The module does not contain the Blink code as class name could clash otherwise. If we want to include all planners here, we need to shade/relocate the flink-table-planner, flink-table-planner-blink, and flink-table-runtime-blink packages.

The SQL Client logically belongs to `flink-table` and should be moved under this module.

User Code Examples

→ User wants to write a table program in the table ecosystem using Java.

Instead of passing a certain execution environment, the user is required to specify the desired execution mode. Under the hood the table environment tries to discover exactly one planner that can satisfy this requirement, otherwise an exception is thrown. The planner takes care of instantiating an execution environment, configuring it, and trigger an execution of a job if requested to do so.

import org.apache.flink.table.api.TableEnvironment;

public void main(String[] args) {
    TableConfig config = TableConfig.builder()
        .asStreamingExecution()
        // example of providing configuration that was in StreamExecutionEnvironment before
        .watermarkInterval(100)   
        .build();

    TableEnvironment tEnv = TableEnvironment.create(config);

    tEnv.scan("MyTable").select("*").insertInto("MyOtherTable");

    tEnv.execute();
}


Internal Steps:


  1. Discover StreamPlanner

  2. StreamPlanner configures itself with the given TableConfig (e.g. watermark interval)

  3. Table environment calls are forwarded to the planner

For users that want to try out experimental planner implementations. We can add more methods to the config:


TableConfig config = TableConfig.builder()
        .asUnifiedExecution() // experimental!
        .watermarkInterval(100)
        .build();


→ User wants to write a table program + translate to Java DataStream API.


Compared to the previous example, a user can use a table environment that is specific for the current execution environment. Any configuration can be applied directly on the execution environment. Under the hood the table environment wraps the execution environment into a specific TableConfig that a matching planner can use.

import org.apache.flink.table.api.java.StreamTableEnvironment;

public void main(String[] args) {
    StreamExecutionEnvironment exec = StreamExecutionEnvironment.createRemoteEnvironment(...);
    exec.getConfig().setAutoWatermarkInterval(...);

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);

    DataStream<?> ds = tEnv.fromAppendStream(...).select("*").toAppendStream(Row.class);

    exec.execute(); // or tEnv.execute();
}

Internal Steps:

  1. Wrap execution environment in a specific TableConfig (e.g. StreamTableConfig)

  2. Discover StreamPlanner

  3. StreamPlanner configures itself with the given execution environment wrapped in the config

  4. Table environment calls are forwarded to the planner

→ User wants to write a table program in the table ecosystem using Java in a unified runtime.

Once we unified the runtime, we can make passing a TableConfig instance optional. It will still be needed to specify things like watermark interval etc. but not the execution mode anymore.

import org.apache.flink.table.api.TableEnvironment;

public void main(String[] args) {

    TableEnvironment tEnv = TableEnvironment.create();

    tEnv.scan("MyTable").select("*").insertInto("MyOtherTable");

    tEnv.execute();
}

Implementation Details

The following code shows how a planner interface could look like. It abstracts the main functionality that is currently implemented in different table environments.

The planner interface is the bridge between base API and planner module. We can add and remove methods whenever we notice that Flink and Blink interfaces differ. This code snippet is just an example:

interface Planner<TABLEENV, API> {

    // check if the given table config is supported by this planner
    boolean matches(TableConfig config);

    void initialize(TableConfig config, CatalogManager manager);

    TABLEENV createTableEnvironment();

    // for sqlUpdate()
    void translateSqlUpdate(String statement, QueryConfig queryConfig);

    // for insertInto()
    void translateInsertInto(Table apiPlan, String targetSink, QueryConfig queryConfig);

    // for sqlQuery()
    Table translateSqlQuery(String query);

    // for explain()
    String explain(Table logicalPlan);

    // for getCompletionHints()
    String[] completeSqlStatement(String statement, int pos);

    // for registerTableSource()
    // checks if a StreamTableSource or BatchTableSource can be used for this planner
    boolean isCompatibleTableSource(TableSource source);

    // for registerTableSink()
    boolean isCompatibleTableSink(TableSink sink);

    // for internal fromDataStream() and fromDataSet()
    // implementers must prepare for internal type (i.e. CRow)
    // this interface is meant only for bridging environments, a unified table environment 
    // would not use this.
    Table fromBridgingApi(UpdateMode updateMode, API bridgingApi, TableSchema schema);

    // for internal toAppendStream() and toRetractStream()
    // implementers must wrap the internal type (i.e. CRow) into Java/Scala API
    // this interface is meant only for bridging environments, a unified table environment 
    // would not use this.
    API toBridgingApi(UpdateMode updateMode, Table logicalPlan)

    void execute(Optional<String> jobName);
}

class StreamPlanner extends Planner<DataStream<CRow>> {
	private StreamExecutionEnvironment env;
	// ...
}

class BatchPlanner extends Planner<DataSet<Row>> {
	private ExecutionEnvironment env;
	// ...
}

class BlinkPlanner extends Planner<StreamTransform<???>> {
	// ...
}


Public Interfaces

Public interfaces remain mostly the same. Some exceptions are mentioned in the implementation plan.

Implementation Plan

The previous sections have shown a vision of how the Table & SQL API could look like. This section names concrete implementation steps that can be converted into JIRA issues.

The order of this list represents the implementation order unless marked with [parallelizable] which means it can be implemented at the same time as the previous task.

Module Split

  1. [Flink 1.8] Perform module split in Flink

    1. In order to not force users to update their dependencies again. An example PoC split without breaking compatibility can be found here.

    2. Classes are not changed but only moved to flink-table-planner.

  2. [Blink] Match against new module structure

Minor API Changes

  1. [Flink 1.8] Deprecate static methods in TableEnvironments

    1. Direct to the `Batch/StreamTableEnvionrment.create()` approach. The `create()` method must not necessarily already perform a planner discovery. We can hard-code the target table environment for now.

    2. `TableEnvironment.create()` is not supported.

  2. [parallelizable] [Flink 1.8] Deprecate "new Table()" with a different solution

    1. Once table is an interface we can easily replace the underlying implementation at any time. The constructor call prevents us from converting it into an interface.

API Preparation

  1. [parallelizable] [Flink & Blink] Uncouple the `Expression` class from `RexNode`s

    1. This separates API expressions from Calcite’s RexNode.

    2. The Expression class can be moved to api-base and will contain validation logic only.

    3. Possible implementation: every expression will contain a `toPlannerExpression` method that translates to a `PlannerExpression` instances. For example, `PlannerCall(OperatorTable.TRIM, PlannerLiteral(...), PlannerLiteral(...))`. Introduce a visitor pattern in Flink and Blink to translate `PlannerExpression` to `RexNode`s.
      Alternative implementation: we use a big visitor pattern for every expression and don’t introduce planner expressions.

    4. Expressions can still remain implemented in Scala for now. TableEnvironment needs Table needs Expression. So we should start moving expressions first.

  2. [Flink] Make the `Table, GroupedTable, WindowedTable, WindowGroupedTable, OverWindowedTable, Window, OverWindow` class an interface

    1. Implemented in Java in `api-base`.

    2. We can keep the "Table" Scala implementation in a planner module until it has been converted to Java.

    3. We can add a method to the planner later to give us a concrete instance. This is one possibility to have a smooth transition period instead of changing all classes at once.

    4. Move TemporalTableFunction into flink-table-api-base.

  3. [parallelizable] [Flink] Port and move TableSource and TableSink to flink-table-common

    1. This step only unblockes the TableEnvironment interfaces task. Stream/BatchTableSouce/Sink remain in flink-table-api-java-bridge for now until they have been reworked.

  4. [parallelizable] [Flink] Port and move ExternalCatalog, ExternalCatalogTable, TableNotExistException, CatalogNotExistException in flink-table-common

    1. Unblocks TableEnvironment interface task and catalog contribution.

  5. [parallelizable] [Flink] Move QueryConfig, BatchQueryConfig, StreamQueryConfig, TableDescriptor in flink-table-api-base

    1. Unblocks TableEnvironment interface task.

  6. [Flink] Make the TableEnvironment, Java/Scala Batch/StreamTableEnvironment interfaces

Pluggable Planner

  1. [Flink] Introduce the planner interface and perform a planner discovery.

    1. The previous tasks should have split the API from the Planner so we should be able to make it pluggable.

  2. [Blink] Match Blink runtime code against new planner interface, operator interface, sorted map state & other potential differences

    1. At this step Blink doesn’t need to support UDFs, TableSources, TableSinks, ExternalCatalogs, TableSchemas, … The only supported data input would be DataStream. Thanks to this scope limitation, we do not have to unify type system nor we will not have to agree on UDFs/TableSources/Sinks/Catalogs API before merging planner/runtime code.

    2. Until the type system has been reworked, the Blink code needs to convert TypeInformation to Blink’s type abstraction.

    3. Work on the TypeSystem and further API changes can be done independently and in parallel to the merging planner/runtime code.

  3. [parallelizable] [Blink] Reduce Blink’s Maven dependencies to a minimum

    1. Remove dependencies to:
      flink-python, pyrolite, orc, parque, hadoop, hive

    2. Do not use a custom Calcite version.
      In the past we already forked and changed a couple of Calcite classes to fix bugs. If Calcite becomes a bottleneck or limits us too much, we can think about forking Calcite in the future. However, forking Calcite also means that we will tend to introduce non-standard compliant syntax and semantics. Every shift from standard SQL should be discussed with the community before. So let’s disable Blink specific SQL features for now.

Blink Merge

  1. [Flink] Merge Blink planner/runtime code

    1. Blink runtime/planner will be merged against existing Table API % the previous implementation plan steps.

    2. Merging can start from streaming or batch, depending on which prerequisites will be completed first (depending on the flink-runtime changes).

    3. Merging should be happening in as small steps as possible (details to be worked out in a separate discussion), like for example:

      1. statistics package

      2. cost calculator

      3. planner framework

      4. ...

    4. Non-core features such as Hive UDFs, Python UDFs, queryable state integration are out of scope for now. Let’s focus on main features such as make TPC-DS work on streaming.

    5. Throwing hard exceptions for unsupported features is fine at this stage as users can always fall back to the old Flink planner.

Advance the API and Unblock New Features

  1. [Flink] Rework Table & SQL API type system

    1. This includes a lot of API changes and requires a new design document.

    2. Compared to the changes in Blink, we should aim for backwards compatibility and unification of Scala/Java type extraction.

  2. [Flink] Finalize unified table connector story

    1. Design new connector interfaces for unified connections to batch and streaming data.

    2. The discussion might have started earlier but we need the implementation ready at this stage.

    3. The design also influences DDL and catalog work.

    4. It touches TableSchema, sources and sinks.

  3. [Flink] Cleanup table environments and port to Java

    1. Split the table environments into subcomponents (Catalog, API, Planner).

  4. [Flink] Introduce new unified table environment `TableEnvironment.create()` in `api-base`

    1. First, true unification of batch/streaming with unified connectors and environment.

    2. Goal: End-to-end SQL TPC-DS on both batch and streaming sources.

  5. [parallelizable] [Flink] Port expressions to Java to make `api-base` Scala-free

    1. The `api-base` module might have a temporary Scala dependency at this stage.

    2. Make the API Scala-free.

Future Planning

  1. [Flink] Plan long-term goals

    1. Make Flink code and Blink code feature equivalent. For example:

      1. Table API enhancements (e.g. map(), flatMap())

      2. Compatibility with Hive UDFs

      3. Python UDFs?

    2. Our goal is to unblock people with new API features as quickly as possible.

    3. Add columnar connectors such as Parquet and ORC to their corresponding connector/format modules

    4. Make planner Scala-free

  1. [Flink] Discuss & agree on core changes

    1. This is a prerequisite to merging Blink’s runtime.

    2. Currently, Blink touches components such as:
      TypeSerializer, IOManager, MemoryManger and StreamGraphGenerator

  2. [Flink] Discuss & agree on operator interface changes

    1. This is a prerequisite to merging Blink’s batch runtime.

  3. [Flink] Discuss & agree on Sorted Map State

  1. This is a prerequisite to merging Blink’s streaming runtime.


→ A more detailed description and discussion should happen in one or more separate design documents. This document only covers a schedule for flink-table related changes.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Existing tests and additional unit and IT cases will verify the changes.

Rejected Alternatives

The question was: How far should a unification go?

In theory, we could have just one TableEnvironment for users of the table ecosystem plus bridging Batch/Stream environments for Scala and Java for users of DataSet/DataStream:

org.apache.flink.table.api.TableEnvironment (api-java)
org.apache.flink.table.api.java.StreamTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.StreamTableEnvironment (api-scala-bridge)

org.apache.flink.table.api.java.BatchTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.BatchTableEnvironment (api-scala-bridge) → 5

If this unification is too much, we would end up with 2 API language-specific TableEnvironments plus bridging Batch/Stream environments for Scala and Java users of DataSet/DataStream:

org.apache.flink.table.api.TableEnvironment (api-base)

org.apache.flink.table.api.java.TableEnvironment (api-java)

org.apache.flink.table.api.scala.TableEnvironment (api-scala)
org.apache.flink.table.api.java.StreamTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.StreamTableEnvironment (api-scala-bridge)

org.apache.flink.table.api.java.BatchTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.BatchTableEnvironment (api-scala-bridge) → 7

→ Decision: We will rework the type system but at a later stage. As can be seen in the implementation plan, a unified TableEnvironment class will come afterwards.

The question was: Should we allow a custom Calcite version?

→ Decision: We can still think about forking Calcite for the features of Blink such as:

1. support temporal table join, i.e. FOR SYSTEM_TIME AS OF syntax

2. support EMIT syntax, used for window early-fire and late-arrival

3. implicit Type Cast

4. support dynamic table parameter
   (like ‘select * from t (key1=value1, key2=value2)’ for Kafka offsets)

5. support more features in MATCH_RECOGNIZE

But every change of SQL syntax and semantics should be discussed thoroughly with the community. These discussions should not block the MVP Blink runtime merging so let's postpone them for now.

Appendix: Porting Guidelines


Some general guidelines when we start porting code to Java or developing new code.


This section is copied from the outdated FLIP-28.


We suggest the following steps to unblock people from developing new features but also start porting existing code either when touching the corresponding class or as a voluntary contribution.


Porting of Existing Classes


In order to clarify the terms here, "porting" means that Scala code is rewritten to Java code without changing the original logic. Breaking existing APIs should be avoided.


Due to different class member visibility principles in Scala and Java, it might be necessary to adapt class structures. For example, `private[flink]` is used quite often and would be `public` in Java which is not always intended, thus, we need to find a reasonable abstraction for these cases.


Since migrating code is a good chance for a code base review, an implementer should pay attention to code deduplication, exposing methods/fields, and proper annotations with `@Internal`, `@PublicEvolving` when performing the migration.


Tests should be migrated in a separate commit. This makes it possible to validate the ported code first before touching test classes.


Development of New Classes


New classes should always be implemented in Java if the surrounding code does not force Scala-specific code.


Examples:


A runtime class that only depends on `ProcessFunction` should be implemented in Java.


A new planner rule or node that only depends on Calcite and runtime classes should be implemented in Java.


If the surrounding code requires Scala, we leave it up to the implementer and committer to decide if related classes should be adapted or even migrated for Java code. If they are not adapted/migrated, a Jira issue should track such a shortcoming and the new class can still be implemented in Scala.


Examples:


A new class needs to implement a trait that requires a Scala collection or `Option` in parameters. The Java code should not depend on Scala classes. Therefore, the trait should be adapted to require Java collections or Java `Optional`. This should happen in a separate commit. If adapting the signature for this trait is too much work because it touches a lot of classes and thus is out of scope for the actual issue, implement a Scala class for now. But open an issue for it to track bigger migration blockers.


A new code generating class needs to be implemented but there are no utility methods for Java so far. Doing multiline code generation with variables and expressions inside is inconvenient in Java. We need to introduce proper tooling for this first, it is acceptable to implement this in Scala for now.


Porting Priorities


The following steps should enable a smooth migration from Java to Scala.


  1. Migrate `flink-table-runtime` classes
    All runtime classes have little dependencies to other classes.

  2. Migrate connector classes to `flink-table-api-*`
    Once we implemented improvements to the unified connector interface, we can also migrate the classes. Among others, it requires a refactoring of the timestamp extractors which are the biggest blockers because they transitively depending on expressions.

  3. Migrate remaining `flink-table-common` classes
    While doing tasks for the new external catalog integration or improvements to the unified connector interfaces, we can migrate the remaining classes.

  4. Migrate remaining API classes
    This includes expressions, logical nodes etc.

  5. Load Scala in `flink-table-planner` into a separate classloader
    After this stage, `flink-table` would be Scala-free from a dependency perspective.

  6. Migrate `flink-table-planner` classes
    Final goal of Scala-free `flink-table`.