This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree
Skip to end of metadata
Go to start of metadata

Status

Current state"Accepted"

Discussion thread: https://lists.apache.org/thread.html/e1eca89862d59f78ba86c9a2596a810fdb1d42bcba7356fbda51e60e@%3Cdev.flink.apache.org%3E

JIRA: FLINK-12251 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the Table & SQL API relies on Flink’s TypeInformation at different positions in the code base. The API uses it for conversion between DataSet/DataStream API, casting, and table schema representation. The planning for code generation and serialization of runtime operators.

The past has shown that TypeInformation is useful when converting between DataSet/DataStream API, however, it does not integrate nicely with SQLs type system and depends on the programming language that is used.

For example, if users have implemented a TableFunction:

case class SimpleUser(name: String, age: Int)

class TableFunc0 extends TableFunction[SimpleUser] {

// make sure input element's format is "<string&gt#<int>"

def eval(user: String): Unit = {

if (user.contains("#")) {

      val splits = user.split("#")

      collect(SimpleUser(splits(0), splits(1).toInt))

    }

}

}

The return type of this table function does not only depend on the function class itself but also on the table environment that is used:

org.apache.flink.table.api.java.StreamTableEnvironment#registerFunction

Uses the Java type extraction stack and extracts TypeInformation by using the reflection-based TypeExtractor.

org.apache.flink.table.api.scala.StreamTableEnvironment#registerFunction

Uses the Scala type extraction stack and extracts TypeInformation by using a Scala macro.

Depending on the table environment, the example above might be serialized using a Case Class serializer or a Kryo serializer (I assume the case class is not recognized as a POJO).

The inflexibility and inconsistency has also been mentioned by other big contributors such as Uber. See:

[FLINK-9484] Improve generic type inference for User-Defined Functions

[FLINK-9294] Improve type inference for UDFs with composite parameter or result type

[FLINK-9501] Allow Object or Wildcard type in user-define functions as parameter types but not result types

[FLINK-9502] Use generic parameter search for user-define functions when argument contains Object type

[FLINK-9430] Support Casting of Object to Primitive types for Flink SQL UDF

The current type system has many different shortcomings.

  • It is not aligned with SQL.

  • For example, precision and scale can not be defined for DECIMALs.

  • The difference between CHAR/VARCHAR is not supported (FLINK-10257, FLINK-9559).

  • Physical type and logical type are tightly coupled.

  • Physical type is type information instead of type serializer.

Goals

This document proposes a complete rework of the Table & SQL API type system. For making the API stable and be comparable or better than existing SQL processors.

This document helps in aligning API, planners, and connectors/formats/catalogs.

Part 2 will discuss how to integrate the type system with UDFs and expressions.

Concepts

The design document is quite large and difficult to be converted into the Confluence wiki format. The original discussion can be found here:

https://docs.google.com/document/d/1a9HUb6OaBIoj9IRfbILcMFPrOL7ALeZ3rVI66dvA2_U/edit#heading=h.xv6wpu93ptb


A PDF version is available here:

  • No labels