Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Flink 1.9 we started to update the Table API type system to be semantically closer to SQL standard types. FLIP-37 introduced a well-defined set of data types with specification of boundaries and conversion classes from/to Java. However, the new type system has not been exposed for user-defined functions which is the goal of this document.
Currently, input, output, and accumulator types of UDFs are extracted in a programming language dependent manner within the table environment. Either Flink's type extractor for Java or Scala's experimental macro architecture is used for extracting type information which is then converted into SQL data type. Thus, UDFs rely on DataSet/DataStream API utilities and are not 100% modular/self-contained.
The current architecture has a couple of issues that have been reported on the ML and Jira in the past. They cause unintended behavior and confusion for users. Some examples:
- The `Row` type is the most common type in Table API but it is extracted as a generic type by default without any notice. SQL users cannot access fields and some operations like using table functions are also not possible unless they override some `getParameterType()/getResultType()` method.
- In SQL queries, users can access fields of Scala case classes or fields of POJO classes. But case classes within POJO are treated as generic types and cannot be accessed. POJOs in case classes are fine. The reason is that once, we leave the Scala world for extraction there is no way back from the Java world.
- TableFunction/AggregateFunction uses Scala extraction. ScalarFunction always uses Java extraction.
- SQL supports the `Map` type but there is no way of extracting this type from a UDF because every change in the type extractor might break DataStream API state backwards compatibility.
- Immutable POJO types are not supported.
- POJOs always have a non-deterministic field order which is why we need to force user to specify all fields again just for an order. With POJOs of 30+ fields this is annoying.
- If a single field in a POJO cannot be extracted, a user needs to specify the entire POJO manually. With POJOs of 30+ fields this is annoying.
- It is not possible to accept a variety of data types in a single method. E.g. an `eval(Object… args)` is not supported. Users might need to implement a list of `eval` methods for all accepted data types.
- Scala users often forgot the `org.apache.flink.api.scala._` import for the macro and are confronted with weird exceptions.
- Useful information about nullability is lost for the optimizer.
Many other shortcomings… (see - FLINK-9484Getting issue details... STATUS )
Public Interfaces
Deprecation of
(Stream)TableEnvironment.registerFunction
- New
TableEnvironment.createTemporaryFunction
- New annotations
FunctionHint
andDataTypeHint
Proposed Changes
We propose a three level approach from which users can choose from depending on their use case: From an extraction-based declaration to more powerful semantics using a custom inference.
- New extraction - Simple things should be simple.
- Enrich the extraction with type annotations - Should solve 80% of all use cases.
- Access the full type inference universe - Implement features like built-in functions.
Each level broadens the set of use cases and has higher precedence than the higher-level abstraction:
Type inference > Annotations > Extraction
Each level is explained in detail below.
We suggest to add a `getTypeInference()` method to `FunctionDefinition`. It gives access to the full type inference universe by having the same functionality as built-in functions if needed. All user-defined functions such as `ScalarFunction`, `TableFunction`, `AggregateFunction` implement this method and provide the logic for the new (possibly annotated) extraction by default.
Even though this is an API breaking change, we aim for backwards compatibility. The new extraction is designed to support most of the old features and enables new features. Some slight adaptation of existing UDFs might be necessary. The new UDF design will only be supported in the newly introduced unified method defined in FLIP-64:
Deprecated with old type inference | New type inference |
registerScalarFunction/ registerAggregateFunction/ registerTableFunction | createTemporaryFunction |
It will enable all kinds of functions in the new `org.apache.flink.table.api.TableEnvironment`.
Resolution Process
Regardless of whether the type extraction or a custom type inference is used, the result will be a set of DataTypes for which a corresponding evaluation method can be searched for. This separates the type system from the actual runtime implementation.
For example, the type inference returned an input signature:[DataType(INT NOT NULL, int.class), DataType(BIGINT, Long.class)]
Please note: Regular Java method invocation rules are applied for finding a suitable evaluation method. This avoids any ambiguity and is clearly defined by the JVM language specification. Some examples:void test(int i)
→ can be called via test(new Integer(12)); or test(12);
void test(int i), void test(Integer i)
→ can be called via test(new Integer(12)); or test(12);
So for the data types with input signature `(int, Long)` the following methods are all valid candidates:
eval(Integer, Long)
eval(Integer, long)
eval(int, Long)
eval(int, long)
eval(int, Object)
eval(Object...)
But like in the JVM, eval(int, Long) would be chosen if all methods are available.
This means an evaluation method can be used for various data types if the type inference allows that.
We eagerly check if the result of the (possibly annotated) extraction matches with the actual evaluation methods.
New extraction
The biggest difference to the existing extraction stack is that we aim to have a unified extraction for Java and Scala that comes "from within a function" and is independent of the "outside table environment".
The following table list the classes that are extracted by default:
Class | Data Type |
String | STRING |
Boolean | BOOLEAN (NOT NULL) |
Byte | TINYINT (NOT NULL) |
Short | SMALLINT (NOT NULL) |
Integer | INT (NOT NULL) |
Long | BIGINT (NOT NULL) |
Float | FLOAT (NOT NULL) |
Double | DOUBLE (NOT NULL) |
java.sql.Date | DATE |
java.sql.Time | TIME(0) |
java.sql.Timestamp | TIMESTAMP(9) |
java.time.OffsetDateTime | TIMESTAMP(9) WITH TIME ZONE |
java.time.Instant | TIMESTAMP(9) WITH LOCAL TIME ZONE |
java.time.Duration | INTERVAL SECOND(9) |
java.time.Period | INTERVAL YEAR(4) TO MONTH |
arrays of the above | ARRAY<E> |
Map<K, V> | MAP<K, V> |
POJOs and Case classes | STRUCTURED TYPE |
The list explicitly excludes the following types:
- MULTISET: An annotation is necessary because we cannot distinguish between MAP and MULTISET based on class information.
- ROW: An annotation is necessary because we need information about fields, names, and order of fields.
- DECIMAL: An annotation is necessary because we need information about precision and scale. This is particularly important for financial applications.
- ANY: Users should explicitly state that they want to use an ANY type here. This forces users to think either of a different, more SQL-like data type or to fix a POJO type were a getter is missing or a class has the wrong visibility.
- NULL, SYMBOL, UNRESOLVED: More logical types and therefore no need to expose them for now.
Varargs and argument names are also extracted where applicable.
Structured Types
In order to have an equivalent representation of TupleTypeInfo, CaseClassTypeInfo, PojoTypeInfo in the SQL type system, we unify those concepts under the SQL standard's "structured type".
Structured types have already been defined as part of FLIP-37. In order to allow type extraction of structured types that are not registered in a catalog, we need to relax the structured type concept to "inline or anonymous structured types" that are not identified by an object identifier in a catalog but the fully qualified implementation class.
In order to support case classes and immutable types, we relax the constraint of enforcing a default constructor by the alternative of having a constructor that fully assigns all fields (same parameter names and types). Because we are already using code generation, the implementation of creating instances even without a default constructor is relatively easy.
The following classes would be valid structured types:
class TestPojo {
public int field;
}
class TestPojo {
private int field;
public void setField(int field) {
this.field = field;
}
public int getField() {
return this.field;
}
}
class TestPojo {
private final int field;
public TestPojo(int field) {
this.field = field;
}
public int getField() {
return this.field;
}
}
Because the ANY type is disabled by default, users will get immediate feedback in the form of an exception if a given type cannot be interpreted as a structured type.
Because we are able to extract a fully assigning constructor, we can give POJOs a deterministic field order. Otherwise we use alphabetical ordering as before.
Limitations & Notes
Due to bytecode language specifics in Scala, we cannot support specialized types such as Scala tuples. Users can use either Java tuples or case classes instead. We can print a helpful error message for that.
Due to bytecode language specifics in Scala, we cannot support Scala primitive types as part of generic parameters. Users can use Java boxed types instead like `java.util.Map<java.lang.Long, java.lang.Long>`. Scala primitive types in case classes/POJOs etc. works as expected.
For aggregate functions with accumulators, MapView and ListViews are skipped during extraction of structured type fields using the same logic as before with POJO types; they are represented as structured types with key/value attributes.
Annotation extraction
Annotations allow to influence and enrich the default type extraction.
This is a functionality that is missing in Flink's current TypeExtractor. For example, it allows to:
- specify data types with precision and scale inline
- in particular define row types inline
- version the extraction which means we can evolve the extractor in the future without breaking backwards compatibility of existing functions
- parameterize which classes/packages should always be treated as ANY type
We propose a simple annotation structure of 2 annotations:
FunctionHint: Hints for functions per class or method.
DataTypeHint: Hints within a FunctionHint, for method parameters, for structured types, or fields of structured types.
FunctionHint
FunctionHint allows to annotate a function class such as TableFunction, ScalarFunction, or AggregateFunction. Each argument is optional and would fall back to the default extraction if not specified.
FunctionHints of methods have higher precedence than class ones. Class-level FunctionHints apply for all functions and their FunctionHints.
Multiple function hints for a class or method are possible for defining overloading behavior.
/**
* A hint for the input, intermediate accumulator, and output types of
* a {@link UserDefinedFunction}.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Repeatable(FunctionHints.class)
public @interface FunctionHint {
/**
* Adds a hint for the input types of this function.
*/
DataTypeHint[] input() default @DataTypeHint();
/**
* Determines whether the last input data type should
* be used as a vararg. Can be YES/NO/UNDEFINED.
*/
HintFlag isVarArgs() default HintFlag.UNDEFINED;
/**
* Adds a hint for the accumulator type of this function.
*/
DataTypeHint accumulator() default @DataTypeHint();
/**
* Adds a hint for the output type of this function.
*/
DataTypeHint output() default @DataTypeHint();
}
Some examples to present the concept:
@FunctionHint(output = @DataTypeHint("ROW<a STRING, b INT>"))
public class TableFunction<Row> {
public void eval(Integer a, Long b, Long c) {
//...
}
}
→ Takes (INT, BIGINT, BIGINT) and returns ROW.
public class TableFunction<String> {
@FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("BOOLEAN")})
@FunctionHint(input = {@DataTypeHint("STRING")})
public void eval(Object... args) {
//...
}
}
→ Takes (INT, BOOLEAN) or (STRING) and returns a STRING.
@FunctionHint(
input = @DataTypeHint(takeArbitraryInput = YES),
isVarArgs = YES,
output = @DataTypeHint("STRING"))
public class TableFunction<Object> {
public void eval(Object... args) {
//...
}
}
→ Takes arbitrary input parameters and returns a STRING.
@FunctionHint(output = @DataTypeHint("ROW<a INT, b STRING>"))
public class TableFunction<Row> {
@FunctionHint(input = @DataTypeHint("INT"))
@FunctionHint(input = @DataTypeHint("BOOLEAN"))
public void eval(Object... args) {
//...
}
}
→ Takes (INT) or (BOOLEAN) and returns ROW. The class-level hint is applied to all method-level hints.
DataTypeHint
As seen in the examples before, the annotation can be used in FunctionHints. Additionally, it can enrich individual parameters, structured classes, or fields.
A DataTypeHint has three responsibilities however each of them are optional and the default extraction kicks in otherwise:
- Defining a logical type with default conversion
e.g. `@DataTypeHint("INT")` - Defining a data type with different conversion
e.g. `@DataTypeHint(value = "TIMESTAMP(3)", bridgedTo = java.sql.Timestamp.class)` - Just parameterizing the extraction
e.g. `@DataTypeHint(version = 1, allowAnyGlobally = true)`
Within a FunctionHint, an empty DataTypeHint (no logical type) is only allowed as top-level property default.
/**
* A hint for extracting a {@link DataType}.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface DataTypeHint {
/**
* A string describing a {@link LogicalType}. No value by default.
*
* @see LogicalTypeParser
*/
String value() default "";
/**
* Adds a hint that data should be represented using the given class when entering or leaving
* the table ecosystem.
*
* <p>A supported conversion class depends on the logical type and its nullability property.
*
* <p>Please see the implementation of {@link LogicalType#supportsInputConversion(Class)},
* {@link LogicalType#supportsOutputConversion(Class)}, or the documentation for more information
* about supported conversions.
*/
Class<?> bridgedTo() default void.class;
// ....
}
The following options for parameterizing the extraction are exposed through the annotation, we might add more in the future. The list might seem pretty big at first glance but keep in mind that extraction is not always performed on little/simple POJOs but is sometimes performed on classes with 100+ fields that may have been generated using Avro or Protobuf:
Parameter | Description |
version | Logic version for future backwards compatibility. Current version by default. |
allowAnyGlobally | General flag that defines whether ANY data type should be used for classes that cannot be mapped to any SQL-like type or cause an error. Set to false by default, which means that an exception is thrown for unmapped types. For example, `java.math.BigDecimal` cannot be mapped because the SQL standard defines that decimals have a fixed precision and scale. |
allowAnyPattern | Patterns that enable the usage of an ANY type. A pattern is a prefix or a fully qualified name of `Class#getName()` excluding arrays. The general `allowAnyGlobally` flag must not be enabled for patterns. |
forceAnyPattern | Patterns that force the usage of an ANY type. A pattern is a prefix or a fully qualified name of `Class#getName()` excluding arrays. `allowAnyGlobally` must not be enabled for forcing ANY types. |
defaultDecimalPrecision | Sets a default precision for all decimals that occur. By default, decimals are not extracted. |
defaultDecimalScale | Sets a default scale for all decimals that occur. By default, decimals are not extracted. |
defaultYearPrecision | Sets a default year precision for year-month intervals. If set to 0, a month interval is assumed. |
defaultSecondPrecision | Sets a default second fraction for timestamps and intervals that occur. E.g. because some planners don't support nano seconds yet. |
takeArbitraryInput | Determines whether arbitrary input should be allowed. If set to true, this has similar behavior as an always passing input type validator. The bridging class must be Object. |
Side note: If the community decides for renaming the ANY type, we can rename `takeArbitraryInput` to `takeAnyInput`.
Some examples:
public class ScalarFunction {
public int eval(@DataTypeHint("DECIMAL(5, 2)") BigDecimal dec, int i, boolean b))
}
→ Help the extraction with some of the parameters.
public class ScalarFunction {
public int eval(@DataTypeHint("ANY") org.joda.time.LocalDateTime t))
}
→ Help the extraction by explicitly specifying an ANY type.
public class ScalarFunction {
public int eval(
@DataTypeHint(version = 1, forceAnyPattern = {"org.joda."}) org.mycompany.User u))
}
→ Extract all the 50 fields automatically but leave out the Joda time classes and treat them as ANY type.
public class TableFunction<String> {
public void eval(String prefix, @DataTypeHint(takeArbitraryInput = YES) Object obj) {
//...
}
}
→ Takes a string and an arbitrary input parameter and returns a STRING.
Manual Definition
If the (possibly annotated) extraction cannot solve a certain use case, for example, because literal values of a function call need to be analyzed or the return type is dependent on the input type. More advanced users can overwrite the `getTypeInference()` method.
Due to FLIP-51, Flink will consist of many utility strategies and examples of how to use them via the built-in functions. We can think about additional type inference builders in the future for creating an instance of `TypeInference` easily.
Compatibility, Deprecation, and Migration Plan
- Implement annotations and new type extractor
- Implement default type inference
- Expose the new default type inference through TableEnvironment.createTemporaryFunction() and deprecate the old ones.
Test Plan
We will add mostly unit tests for testing the implementation.
Rejected Alternatives
Not applicable.