DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The motivation of the FLIP is to further extend the range of operations and expressions that are SQL serializable. In the past there’s been a number of FLIPs that made certain parts of the Table API expressible in SQL strings:
FLIP-502: QueryOperation SQL Serialization customization - Apache Flink - Apache Software Foundation
FLIP-393: Make QueryOperations SQL serializable - Apache Flink - Apache Software Foundation
FLIP-51: Rework of the Expression Design - Apache Flink - Apache Software Foundation
After FLIP-393 and FLIP-502, expressions are fully serializable.
Table API supports consuming and producing structured types from UDFs. Those structured types are fully supported by the planner's type system can be printed or collected:
TableEnvironment env = ...
public class PojoScalarFunction extends ScalarFunction {
public MyPojo eval(String name, int score) {
return new MyPojo(name, score);
}
}
public class MyPojo {
public String name;
public int score;
public MyPojo(String name, int score) {
this.name = name;
this.score = score;
}
}
env.createFunction("PojoScalarFunction", PojoScalarFunction.class);
env.executeSql("SELECT PojoScalarFunction('Bob', 42)").print();
// Returns: (name=Bob, score=42)
env.executeSql("SELECT PojoScalarFunction('Bob', 42)").collect();
// Returns instances of MyPojo.class
However, structured types derived from an inline class definition still lack a SQL and Table API representation. Thus, it is not possible to declare a table using CREATE TABLE, creating structured types outside of UDFs, using a CAST, or declaring them as part of a @DataTypeHint function annotation. Table API’s table descriptors support structured types already.
While the RAW type has serializable representation, RAW('c', 's') where c is the originating class and s is the serialized TypeSerializerSnapshot, structured types have more shortcomings in terms of usage than raw types.
Note: This FLIP focuses on a single evolution step for structured types. There are various other improvements that could be proposed but are out of scope and might be covered in future FLIPs.
Public Interfaces
SQL Data Type
Update of FlinkSqlParser, LogicalTypeParser, and StructuredType.asSerializableString for:
<structured_type> ::= STRUCTURED '<' <name> ',' <field_list> '>'
<name> ::= <identifier>
<field_list> ::= <field> { ',' <field> }*
<field> ::= <identifier> <type> <comment>
Additions to StructuredType:
public class StructuredType extends UserDefinedType {
/**
* Creates a builder for a {@link StructuredType} that is not stored in a catalog and is
* identified by a class name. Use {@link newBuilder(Class)} if the class name is on the classpath.
*/
public static StructuredType.Builder newBuilder(String className) { ... }
/**
* Returns the class name for an explicit structured type. Empty if type stored in a catalog.
*/
Optional<String> getClassName() { ... }
}
Additions to DataTypes:
/**
* Data type of a user-defined object structured type that is not stored in a catalog and is
* identified by a class name.
*/
public static <T> DataType STRUCTURED(String implementationClass, Field... fields) { ... }
Value Construction
SQL:
OBJECT_OF(<name>, [<key>, <value> [, <key>, <value> , ...]] )
Table API:
Expressions.objectOf(String, Object... kv); Expressions.objectOf(Class, Object... kv); Expressions.objectOf(DataType, Object... kv);
Value Modification
SQL:
OBJECT_UPDATE(<object>, <key>, <value> [, <key>, <value> , ...]] )
Table API:
BaseExpressions.objectUpdate(Object... kv);
Proposed Changes
SQL Data Type
Introduce a string representation for the existing unregistered, inline
StructuredTypefollowing the pattern:STRUCTURED< <name> , <field_list> >User-defined structured types that are registered in a catalog are not considered here and remain unsupported for future work.
Example:
CREATE TABLE MyTable (
uid BIGINT,
user STRUCTURED<'com.example.User', name STRING, age INT NOT NULL>
);
-- Casts a row type into a structured type
INSERT INTO MyTable SELECT CAST(('Bob', 42) AS STRUCTURED<'com.example.User', name STRING, age INT>);
Notes:
The name
STRUCTUREDmatches with the existingStructuredType.The list of arguments matches with
org.apache.flink.table.api.DataTypes#STRUCTURED(Class<T>, Field...)The
<name>literal is metadata and not read by the engine, but only at the edges (e.g. in UDFs or Table API).- We are not enforcing that the name is a valid class name.
- The API language can decide what to do with the <name>. For Java and Python, the name should represent a fully qualified class name.
If the given
<name>cannot be resolved to a class, the existingStructuredType#FALLBACK_CONVERSIONapplies (i.e.Row.class).
Handling Structured Types in SQL
Introduce built-in functions
OBJECT_OFandOBJECT_UPDATEto create and modify structured types in SQLThis avoids the need of UDFs to perform mutations on structured types.
Additionally, the functions allow serializability of objects in
TableEnvironment.fromValuesand in all locations that allow API expressions.
Example:
-- Instantiate the structured type
SELECT OBJECT_OF(
'com.example.User',
'name', 'Bob',
'age', 42
);
-- Returns: (name=Bob, age=42)
-- Mutate existing fields in a structured type
SELECT OBJECT_UPDATE(
OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42),
'name', 'Alice'
);
-- Returns: (name=Alice, age=42)
Notes:
The name
OBJECTaligns with other vendors such as Oracle or Snowflake
OBJECT_CONSTRUCT | Snowflake Documentation
12 Using PL/SQL Object TypesThe name
OBJECTalso aligns with the built-in functionJSON_OBJECTwhich already exists in Flink.OBJECT_OFwas chosen to reserveOBJECTfor semi-structured types, making_OFa more explicitly typed version ofOBJECTMethods such as
OBJECT_INSERTorOBJECT_REMOVEmake no sense to support in the first version because they would be incompatible with the corresponding class (i.e. if additional fields exist or fields are missing).Keys in both methods must be string literals.
Keys in
OBJECT_UPDATEmust exist in the input object.
For Table API, we provide the following functionality:
TableEnvironment env = ...
Table objects = env.fromValues(objectOf(MyPojo.class, "Bob", 42)).as("obj");
objects.execute().print();
// Returns: (name=Bob, age=42)
objects.select($("obj").objectUpdate("name", "Alice")).execute().print();
// Returns: (name=Alice, age=42)
Examples
Using TYPEOF in SQL
-- Always returns `STRUCTURED<'com.example.User', name STRING, age INT>` regardless whether the class exists or not SELECT TYPEOF(OBJECT_OF( 'com.example.User', 'name', 'Bob', 'age', 42 ));
Combining SQL and Table API
TableEnvironment env = ...
Table t = env.sqlQuery("SELECT OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42)");
// Tries to resolve `com.example.User` in the classpath, if not present returns `Row`
t.execute().collect();
Compatibility, Deprecation, and Migration Plan
Not applicable.
Test Plan
- The engine already supports serialization and casting of structured types.
- Type handling will be tested with unit tests and ITCases.
- Functions will be tested with
BuiltInFunctionTestBase.
Rejected Alternatives
None