This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree
Skip to end of metadata
Go to start of metadata

Status

Current state: "accepted"

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-51-Rework-of-the-Expression-Design-td31653.html

JIRA FLINK-13773 - Getting issue details... STATUS

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Remove PlannerExpression

After FLIP-32, we introduced the Java Expression to make tableApi scala free.

After FLIP-37, we introduced the new type system to get rid of TypeInformation in table-api.

But now we still have scala's PlannerExpression to inference type, validate inputs and convert expression to calcite RexNode. We should rework Expression to let itself to do these things, and remove PlannerExpression to unify flink-planner and blink-planner.

Let TableApi and Calcite definitions consistent

Now the type inference and input validation of PlannerExpression are very simple, which are quite different from Calcite's Sql ReturnTypeInference and SqlOperandTypeChecker. This leads to inconsistencies in definitions of function in some cases, and results bugs. If we have similar interface capabilities, we can align tableApi with Calcite.

Reduce the complexity of Function development

Function addition of Flink table has always been complex. We need to add relevant FunctionDefinition to TableApi, add SqlFunction to SqlOperatorTable, and add specific runtime implementation. 

In future, we need to add only a FunctionDefinition, which contains type inference and validation, can be automatically converted to SqlFunction by the framework, and can even include the runtime implementation. (ScalarFunction is now a FunctionDefinition with runtime implementation).

Powerful Function for user

Another advantage is that users can implement functions that are as powerful as Flink's built in ones. with code generator and expresion way and Blink's binary data formats even with the same performance. And users can use type inference to do what they want, It makes users more flexible.

FunctionDefinition

Expressions can be declared as FunctionDefinitions. This could look similar to:

FunctionDefinition.builder()

     .name("sqrt")

     .standardSql("SQRT")

     .kind(SCALAR)

     .outputTypeStrategy(TypeStrategies.DOUBLE_NULLABLE)
    .inputTypeValidator(InputTypeValidators.NUMERIC_NUMERIC)

     //.namedArguments(Arrays.asList(“a”, “b”))

     //.typedArguments(Arrays.asList(DataTypes.INT, DataTypes.INT))

     //.inputTypeStrategy(InputTypes..)

     //.notDeterministic()

     //.monotonicity(Monotonicity.INCREASING)
    .build()

Flink will provide a set of strategies similar to those that Calcite supports. This allows for both explicit data type declaration as well as declaration of families, nullability, or complex logic.

Depending on the function definition, a runtime implementation can be attached (in case of UDFs) or not (in case of built-in functions). In future, we can move internal code gen logical or declarative logical (see DeclarativeAggregateFunction) to function definition too.

Another thing we need to address is the conversion between function definition and RexNode of calcite. Introduce standardSql to convert internal calcite builtin functions to calcite SqlStdOperatorTable SqlFunctions. The other functions we can generate a SqlFunction wrapper (TypeInference wrapper and so on) to avoid introduce SqlFunction again.

NOTE: we can not use wrapper to all functions, because some of the built-in functions of calcite have very special logic, such as SqlLikeOperator, which deals with parser-related logic, which we do not intend to support, we retain functions in SqlStdOperatorTable as much as possible and deal with it by the standard Sql way.

Let’s expain one by one:

name

be the same as catalog name. have this name to help register it to catalog. And it should be the same as the name of calcite SqlFunction too.

standardSql

The standard sql function name of this function definition, only standard function should have this field.

It help us mapping flink function definition to calcite SqlFunction.

kind

Some thing like calcite SqlFunctionCategory, categorizes the semantics of a function definition. FunctionDefinition need it to decide whether it is a table function or aggregate function or something else.

outputTypeStrategy

Strategy for inferring the output data type of a function call. See calcite SqlReturnTypeInference and ReturnTypes.

inputTypeValidator

Validator for checking the input data types of a function call. See calcite SqlOperandTypeInference and OperandTypes.

intputTypeStrategy

  • For null value type inference, intputTypeStrategy convert the unknown type to known type, for example: func0(null, 5.3), the first argument should be type clear, for input type validator or method selection.
  • And it is also useful for the user's UDF, through which the user can specify a specific input conversion classes.
  • In the case of multiple parameters, if the function needs to be strongly converted to the same type to all parameters, it can also use InputTypeStrategy, after inference, Flink will add cast parameters to what user return in InputTypeStrategy.

namedArguments

This information is useful for SQL's concept of named arguments using the assignment operator (e.g. {@code FUNC(max => 42)})

Calcite use the namedArguments just for UserDefineFunction, depends on namedArguments, will reorder the arguments.

typedArguments

Along with named Arguments, this information is useful for implicit and safe casting.

deterministic

true if a call to this function is guaranteed to always return the same result given the same operands; true is assumed by default.

monotonicity

Returns whether a call to this operator is monotonic.

Introduce TypeInference

Existed interface

The following interface describes the type inference(code already exist, just list them):

/**

* Validator for checking the input data types of a function call.

*

* @see InputTypeValidators

*/

public interface InputTypeValidator {

  /**

   * Initial input validation based on the number of arguments.

   */

  ArgumentCount getArgumentCount();

  /**

   * Main logic for validating the input. Returns {@code true} if the arguments are valid for the

   * given call, {@code false} otherwise.

   *

   * @param callContext provides details about the function call

   * @param throwOnFailure whether this function is allowed to throw an {@link ValidationException}

   *                       with a meaningful exception in case the validation is not successful or

   *                       if this function should simple return {@code false}.

   */

  boolean validate(CallContext callContext, boolean throwOnFailure);

  /**

   * Returns a summary of the function's expected signatures.

   *

   * @param definition the function definition that defines the function currently being called.

   */

  List<Signature> getExpectedSignatures(FunctionDefinition definition);

}

/**

* Strategy for inferring the data type of a function call. The inferred type might describe the

* final result or an intermediate result (accumulation type) of a function.

*

* @see TypeStrategies

*/

@PublicEvolving

public interface TypeStrategy {

  /**

   * Infers a type from the given function call.

   */

  Optional<DataType> inferType(CallContext callContext);

}

/**

* Provides details about the function call for which type inference is performed.

*/

@PublicEvolving

public interface CallContextBase {

  /**

   * Returns the function definition that defines the function currently being called.

   */

  FunctionDefinition getFunctionDefinition();

  /**

   * Returns whether the argument at the given position is a value literal.

   */

  boolean isArgumentLiteral(int pos);

  /**

   * Returns {@code true} if the argument at the given position is a literal and {@code null},

   * {@code false} otherwise.

   *

   * <p>Use {@link #isArgumentLiteral(int)} before to check if the argument is actually a literal.

   */

  boolean isArgumentNull(int pos);

  /**

   * Returns the literal value of the argument at the given position, given that the argument is a

   * literal, is not null, and can be expressed as an instance of the provided class.

   *

   * <p>Use {@link #isArgumentLiteral(int)} before to check if the argument is actually a literal.

   */

  <T> Optional<T> getArgumentValue(int pos, Class<T> clazz);

   /**

   * Returns the set of usages in this context.

   */

  default Set<ContextUsage> getUsages() {

    return Collections.emptySet();

  }

  /**

   * Returns the function's name usually referencing the function in a catalog.

   *

   * <p>Note: The name is meant for debugging purposes only.

   */

  String getName();

}

/**

* Provides details about the function call for {@link InputTypeValidator} and {@link TypeStrategy}.

*/

@PublicEvolving

public interface CallContext extends CallContextBase {

  /**

   * Returns a resolved list of the call's argument types. It also includes a type for every argument

   * in a vararg function call.

   */

  List<DataType> getArgumentDataTypes();

}

/**

* Characteristics that a {@link CallContextBase} use.

*/

public enum ContextUsage {

  /**

   * Using grouping columns for aggregation, indicate the grouping size is greater than zero.

   */

  GROUPING_AGGREGATION,

  /**

   * Indicate the function is an aggregate function with a filter.

   */

  FILTERING_AGGREGATION

}

Introduce InputTypeStrategy

/**

* Strategy to infer unknown types of the inputs of a function definition.

*/

public interface InputTypeStrategy {

 /**

  * Infers operand types.

  *

  * @param callContext  description of the call being analyzed

  * @param outputType   the type known or inferred for the result of the call

  * @param inputTypes receives the inferred types for all operands

  */

 void inferTypes(CallContext callContext, DataType outputType, DataType[] inputTypes);

}

Unlike output type strategy, InputType Strategy needs more information. It needs to know output type, and return the inferred types.The unknown type should be NullType.

The main methods are consistent with Calcite, which helps us to better conversion.

The invoking order is:

  • infer output type
  • infer input type (use inferred output type)
  • validate input
  • infer output type
  • …...

A very different thing is that Calcite's default type is not Null, while flink's default is nullable, so this affects the naming of TypeStrategies fields and so on.

Convert FunctionDefinition to RexNode

There are three kinds of conversion:

  1. The function definition can directly converted to SqlFunction, for this kind, we can use the standardSql field. (Like AND, OR, NOT...)
  2. Instead of directly converting to SqlFunction, it's converting to a combination of RexNode, which may currently only be solved through the ugly IfElse. But later, I think we should solve it by DeclarativeFunction, just like ScalarFunction, it contains implementation. ScalarFunction contains runtime Java code implementation, but DeclarativeFunction contains ExpressionApi code, it is a combination of Expressions, so we can use it to replace the conversion of combination of RexNodes. (Like BETWEEN, TRIM...) 
  3. New Functions, which should be solved by SqlFunctionWrapper, wrap FunctionDefinition on SqlFunction. Here we need to do some type Inference related wraps. (Like E, LOG, BIN...)

Directly convert to SqlFunction

If a function definition:

FunctionDefinition.builder()

     .name("sqrt")

     .standardSql("SQRT")

     …

In ExpressionConverter, we will get this standard sql define name and get the real SqlFunction from calcite SqlStdOperatorTable:

Field field = SqlStdOperatorTable.class.getField(sqlName);

return relBuilder.call(

     (SqlOperator) field.get(SqlStdOperatorTable.class),

     childrenRexNodes);

Convert to a combination of RexNodes

In this case, we can only implement RexNode's combination in planner through IfElse. (Or something like current RexNodeConverter)

New Functions

We should introduce a wrapper like this:

public SqlFunctionWrapper(FlinkTypeFactory factory, FunctionDefinition definition) {

  super(definition.getName(),

     SqlKind.OTHER_FUNCTION,

     new SqlReturnTypeInferenceWrapper(factory, definition),

     new SqlOperandTypeInferenceWrapper(factory, definition),

     new SqlOperandTypeCheckerWrapper(definition),

     SqlFunctionCategory.SYSTEM);

  this.definition = definition;

}

Calcite uses SqlFunctionCategory to determine whether a function is userDefineFunction, but we have treated UDF specially and will not be included in this wrapper, so we can use SqlFunctionCategory.SYSTEM directly.

And we should add wrapper: CallContextWrapper, SqlOperandTypeCheckerWrapper, SqlReturnTypeInferenceWrapper too.

(If we want to expose the first_value and listagg aggregate function in blink-planner, we need introduce SqlAggFunctionWrapper too)

But if we want to achieve this, we need to ensure that interface functions such as Flink and calcite type inference are fully covered.

After that, we can remove BasicOperatorTable of flink-planner and FlinkSqlOperatorTable of blink-planner, let FunctionDefinition be the first citizen.(The whole thing need more effort on integrating FunctionCatalog into CatalogManager, it is a different FLIP) The structure becoming:

Implement TypeInference

list all type inference

Calcite SqlReturnTypeInferences:

  • OrdinalReturnTypeInference
  • ExplicitReturnTypeInference
  • SqlReturnTypeInferenceChain
  • SqlTypeTransformCascade
  • MatchReturnTypeInference
  • …...

Calcite SqlTypeTransforms:

  • TO_NULLABLE
  • TO_VARYING
  • FORCE_NULLABLE
  • LEAST_NULLABLE
  • ……

Calcite SqlOperandTypeChecker:

  • NumericExceptFirstOperandChecker
  • AssignableOperandTypeChecker
  • LiteralOperandTypeChecker
  • SameOperandTypeChecker
  • FamilyOperandTypeChecker
  • CompositeSingleOperandTypeChecker
  • CompositeOperandTypeChecker
  • ......

Cacltie SqlOperandTypeInference:

  • FIRST_KNOW
  • RETURN_TYPE

Function behavior

https://docs.google.com/document/d/1fyVmdGgbO1XmIyQ1BaoG_h5BcNcF3q9UJ1Bj1euO240/edit?usp=sharing

How to test one by one

  • We need more tests to planner ExpressionTestBase, do more tests cover type inference and validator. Similar class in calcite is SqlOperatorBaseTest.
  • We need a good testing infrastructure for ensuring the same between function definition and standard functions in calcite. We should test: type inferences are equivalent the sqlStdOperator references an existing entry in SqlStdOperator ...

Nice to have

It's not necessary for 1.10. We can support it in the future.

FunctionDefinition: FunctionMetadata

FunctionDefinition.builder()

     .name("sqrt")

     .metadata(Metadata.builder()...)

Just meta information, we add this one to help user/developer understand functions and help generating document. 

// meta information

interface FunctionMetadata {

    // name (can be equal to catalog name but is just metadata)

    // e.g. date_add

    getName(): Optional<String>

    // signature (usually the first column in a CLI)
    // the variable `_FUNC_` can and will be replaced in the string
    // e.g. "_FUNC_(start_date, num_days)"
    getSignature(): Optional<String>

    // description (usually one sentence for CLI)
    // e.g. "Returns the date that is num_days after start_date."
    getDescription(): Optional<String>

    // more description (usually more in-depth behavior description)
    // e.g. with examples
    // "start_date is a string in the format 'yyyy-MM-dd HH:mm:ss'\n
    // Example: _FUNC_(...) Returns: ..."
    getDetailedDescription(): Optional<String>

}

CodeGenFunction and DeclarativeFunction

In future, we can introduce CodeGenFunction, maybe it should just expose in scala-api module, maybe should do something to port planner code gen api to scala-api module.

In the future, we can introduce DeclarativeFunction, I think it's more worth introducing. And we should do some refactor on DeclarativeAggregateFunction first.

Consistency field in InputTypeValidator

Consider calcite SqlOperandTypeChecker: there is a Consistency field for making the arguments have consistency types. Calcite will use the field to cast inputs to a consistent type. Now, it is only used in StdOperatorTable for equals, greaterThan…

Since these equals/greaterThan will be converted to standard function of calcite, we do not need to add them at present, and there are no other extended functions that need this field at present, so we will not add them for the time being.

Road map

  1. Complete BuiltInFunctionDefinition and FunctionDefinition
    1. add standardSql field
    2. add isDeterministic field
    3. add monotonicity field
    4. add InputTypeStrategy to type inference
    5. CallContext support ContextUsage.GROUPING_AGGREGATION
  2. Introduce wrappers
    1. Introduce SqlOperandTypeCheckerWrapper
    2. Introduce SqlReturnTypeInferenceWrapper
    3. Introduce SqlOperandTypeInferenceWrapper
    4. Introduce SqlFunctionWrapper
  3. Introduce ExpressionConverter (If no type inference implementation for function, still do PlannerExpression way)
    1. Support that standardSql field, directly convert to SqlFunction of SqlStdOperatorTable
    2. Support SqlFunctionWrapper
  4. Use new Expression in RexNodeToExpressionConverter instead of PlannerExpression. (for remove PlannerExpressions)
  5. Start rework BuiltInFunctionDefinitions, With the judge of ResolveCallByArgumentsRule and ExpressionConverter. (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING), we can implement FunctionDefinition one by one and drop PlannerExpressions one by one.
    1. Introduce TypeStrategies: CascadeTypeStrategy, ExplicitTypeStrategy, OrdinalTypeStrategy, ChainTypeStrategy
    2. Introduce TypeTransformations: ToNullableTransformation, FORCE_NULLABLE, TO_VARYING
    3. Introduce InputTypeStrategies: FIRST_KNOWN, OUTPUT_TYPE, BOOLEAN
    4. Introduce InputTypeValidators: CompositeTypeValidator, AnyTypeValidator, PassingTypeValidator, TypeFamilyTypeValidator, TypeRootTypeValidator, SameTypeValidator.
    5. Implement type inference for logic functions. And introduce a testing infrastructure to verify flink type inference and calcite type inference. 
    6. Implement type inference for string functions.
    7. Implement type inference for math functions.
    8. Implement type inference for time functions.
    9. Implement type inference for other functions.
    10. Drop PlannerExpressions one by one.
    11. Apply things in blink-planner and add blink-planner extended functions

Prototype code

https://github.com/JingsongLi/flink/tree/exprRework

Document

https://docs.google.com/document/d/1yFDyquMo_-VZ59vyhaMshpPtg7p87b9IYdAtMXv5XmM/edit?usp=sharing

https://docs.google.com/document/d/1fyVmdGgbO1XmIyQ1BaoG_h5BcNcF3q9UJ1Bj1euO240/edit?usp=sharing


  • No labels