Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The motivation of the FLIP is to further extend the range of operations and expressions that are SQL serializable. In the past there’s been a number of FLIPs that made certain parts of the Table API expressible in SQL strings:

After FLIP-393 and FLIP-502, expressions are fully serializable.

Table API supports consuming and producing structured types from UDFs. Those structured types are fully supported by the planner's type system can be printed or collected:

TableEnvironment env = ...

public class PojoScalarFunction extends ScalarFunction {
  public MyPojo eval(String name, int score) {
    return new MyPojo(name, score);
  }
}

public class MyPojo {
  public String name;
  public int score;
  public MyPojo(String name, int score) {
    this.name = name;
    this.score = score;
  }
}

env.createFunction("PojoScalarFunction", PojoScalarFunction.class);

env.executeSql("SELECT PojoScalarFunction('Bob', 42)").print();
// Returns: (name=Bob, score=42)

env.executeSql("SELECT PojoScalarFunction('Bob', 42)").collect();
// Returns instances of MyPojo.class

However, structured types derived from an inline class definition still lack a SQL and Table API representation. Thus, it is not possible to declare a table using CREATE TABLE, creating structured types outside of UDFs, using a CAST, or declaring them as part of a @DataTypeHint function annotation. Table API’s table descriptors support structured types already.

While the RAW type has serializable representation, RAW('c', 's') where c is the originating class and s is the serialized TypeSerializerSnapshot, structured types have more shortcomings in terms of usage than raw types.

Note: This FLIP focuses on a single evolution step for structured types. There are various other improvements that could be proposed but are out of scope and might be covered in future FLIPs.

Public Interfaces

SQL Data Type

Update of FlinkSqlParser, LogicalTypeParser, and StructuredType.asSerializableString for:

<structured_type> ::= STRUCTURED '<' <name> ',' <field_list> '>'

<name> ::= <identifier>

<field_list> ::= <field> { ',' <field> }*

<field> ::= <identifier> <type> <comment>

Additions to StructuredType:

public class StructuredType extends UserDefinedType {

  /**
    * Creates a builder for a {@link StructuredType} that is not stored in a catalog and is
    * identified by a class name. Use {@link newBuilder(Class)} if the class name is on the classpath.
    */
  public static StructuredType.Builder newBuilder(String className) { ... }

  /**
    * Returns the class name for an explicit structured type. Empty if type stored in a catalog.
    */
  Optional<String> getClassName() { ... }
}

Additions to DataTypes:

/**
  * Data type of a user-defined object structured type that is not stored in a catalog and is
  * identified by a class name.
  */
public static <T> DataType STRUCTURED(String implementationClass, Field... fields) { ... }

Value Construction

SQL:

OBJECT_OF(<name>, [<key>, <value> [, <key>, <value> , ...]] )

Table API:

Expressions.objectOf(String, Object... kv);

Expressions.objectOf(Class, Object... kv);

Expressions.objectOf(DataType, Object... kv);

Value Modification

SQL:

OBJECT_UPDATE(<object>, <key>, <value> [, <key>, <value> , ...]] )

Table API:

BaseExpressions.objectUpdate(Object... kv);

Proposed Changes

SQL Data Type

  • Introduce a string representation for the existing unregistered, inline StructuredType following the pattern:
    STRUCTURED< <name> , <field_list> >

  • User-defined structured types that are registered in a catalog are not considered here and remain unsupported for future work.

Example:

CREATE TABLE MyTable (
  uid BIGINT,
  user STRUCTURED<'com.example.User', name STRING, age INT NOT NULL>
);

-- Casts a row type into a structured type
INSERT INTO MyTable SELECT CAST(('Bob', 42) AS STRUCTURED<'com.example.User', name STRING, age INT>);

Notes:

  • The name STRUCTURED matches with the existing StructuredType.

  • The list of arguments matches with org.apache.flink.table.api.DataTypes#STRUCTURED(Class<T>, Field...)

  • The <name> literal is metadata and not read by the engine, but only at the edges (e.g. in UDFs or Table API).

  • We are not enforcing that the name is a valid class name.
  • The API language can decide what to do with the <name>. For Java and Python, the name should represent a fully qualified class name.
  • If the given <name> cannot be resolved to a class, the existing StructuredType#FALLBACK_CONVERSION applies (i.e. Row.class).

Handling Structured Types in SQL

  • Introduce built-in functions OBJECT_OF and OBJECT_UPDATE to create and modify structured types in SQL

  • This avoids the need of UDFs to perform mutations on structured types.

  • Additionally, the functions allow serializability of objects in TableEnvironment.fromValues and in all locations that allow API expressions.

Example:

-- Instantiate the structured type
SELECT OBJECT_OF(
  'com.example.User',
  'name', 'Bob',
  'age', 42
);
-- Returns: (name=Bob, age=42)


-- Mutate existing fields in a structured type
SELECT OBJECT_UPDATE(
  OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42),
  'name', 'Alice'
);
-- Returns: (name=Alice, age=42)

Notes:

  • The name OBJECT aligns with other vendors such as Oracle or Snowflake
    OBJECT_CONSTRUCT | Snowflake Documentation
    12 Using PL/SQL Object Types

  • The name OBJECT also aligns with the built-in function JSON_OBJECT which already exists in Flink.

  • OBJECT_OF was chosen to reserve OBJECT for semi-structured types, making _OF a more explicitly typed version of OBJECT

  • Methods such as OBJECT_INSERTor OBJECT_REMOVE make no sense to support in the first version because they would be incompatible with the corresponding class (i.e. if additional fields exist or fields are missing).

  • Keys in both methods must be string literals.

  • Keys in OBJECT_UPDATE must exist in the input object.

For Table API, we provide the following functionality:

TableEnvironment env = ...

Table objects = env.fromValues(objectOf(MyPojo.class, "Bob", 42)).as("obj");

objects.execute().print();
// Returns: (name=Bob, age=42)

objects.select($("obj").objectUpdate("name", "Alice")).execute().print();
// Returns: (name=Alice, age=42)

Examples

Using TYPEOF in SQL


-- Always returns `STRUCTURED<'com.example.User', name STRING, age INT>` regardless whether the class exists or not
SELECT TYPEOF(OBJECT_OF(
  'com.example.User',
  'name', 'Bob',
  'age', 42
));

Combining SQL and Table API


TableEnvironment env = ...

Table t = env.sqlQuery("SELECT OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42)");

// Tries to resolve `com.example.User` in the classpath, if not present returns `Row`
t.execute().collect();


Compatibility, Deprecation, and Migration Plan

Not applicable.

Test Plan

  • The engine already supports serialization and casting of structured types.
  • Type handling will be tested with unit tests and ITCases.
  • Functions will be tested with BuiltInFunctionTestBase.

Rejected Alternatives

None