Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP is mostly a follow up to a number of other FLIPs including:

It is another step in the story of making the flink-table-api module completely self sufficient until the time of query planning and submission.

It also lets us store Table API queries in a catalog as they’d be SQL serializable. The need for that has been illustrated e.g. by https://cwiki.apache.org/confluence/x/-wurBw which already serializes some expressions as SQL.

Proposed Changes

Serialize QueryOperations as SQL

The proposal of this FLIP is to use SQL for serializing QueryOperations and ResolvedExpressions Needed interfaces are already in place, but are not implemented yet:

ResolvedExpression:

public interface ResolvedExpression extends Expression {

    /**
     * Returns a string that fully serializes this instance. The serialized string can be used for
     * storing the query in, for example, a {@link org.apache.flink.table.catalog.Catalog} as a
     * view.
     *
     * @return detailed string for persisting in a catalog
     */
    default String asSerializableString() {
        throw new TableException(
                String.format(
                        "Expression '%s' is not string serializable. Currently, only expressions that "
                                + "originated from a SQL expression have a well-defined string representation.",
                        asSummaryString()));
    }
    
    ....
}


QueryOperation:

public interface QueryOperation extends Operation {

    /**
     * Returns a string that fully serializes this instance. The serialized string can be used for
     * storing the query in e.g. a {@link org.apache.flink.table.catalog.Catalog} as a view.
     *
     * @return detailed string for persisting in a catalog
     * @see Operation#asSummaryString()
     */
    default String asSerializableString() {
        throw new UnsupportedOperationException(
                "QueryOperations are not string serializable for now.");
    }
 ....   
}

There is a very narrow subset of expressions that already follow this guideline. If an expression comes from the Planner and is backed by a `RexNode` it can be serialized to its SQL representation: https://github.com/apache/flink/blob/fa0dd3559e9697e21795a2634d8d99a0b7efdcf3/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java#L118


Example:

env.from("customers")
                        .filter($("gender").isNotNull())
                        .filter($("has_newsletter").isEqual(true))
                        .filter($("date_of_birth").isGreaterOrEqual(LocalDate.parse("1980-01-01")))
                        .select(
                                $("name").upperCase(),
                                $("date_of_birth"))

results in:

Project: (projections: [upper(name), date_of_birth, AddressNormalizer(street, zip_code, city)])
    Filter: (condition: [greaterThanOrEqual(date_of_birth, 1980-01-01)])
        Filter: (condition: [equals(has_newsletter, true)])
            Filter: (condition: [isNotNull(gender)])
                CatalogTable: (identifier: [default_catalog.default_database.customers], fields: [name, date_of_birth, street, zip_code, city, gender, has_newsletter])

which would get translated into a SQL:

SELECT upper(name), date_of_birth FROM (
    SELECT * FROM (
        SELECT * FROM (
            SELECT * FROM ( 
                SELECT name, date_of_birth, street, zip_code, city, gender, has_newsletter FROM `default_catalog`.`default_database`.`customers`;
            ) WHERE gender IS NOT NULL
        ) WHERE has_newsletter = true
    ) WHERE date_of_birth >= 1980-01-01
) 


Functions requiring special syntax

There is a number of built-in functions that do not follow the standard function call syntax like:

udf(arg0, arg1, arg2, ...)

Those functions include:

  • operators

    • arithmetic: +, -,/, *, %, …

    • comparison: LIKE, IN, BETWEEN, IS NULL, …

    • logical: AND, OR, …

  • prefix operators

    • -f0

  • special syntax:

    • POSITION(string1 IN string2)

    • OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ])

    • JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])

In order to support serialising those functions into its SQL alternatives we suggest adding an interface (inspired by https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/sql/SqlSyntax.java ):

public interface CallSyntax {
    String unparse(BuiltInFunctionDefinition functionDefinition, List<String> operands);
}


Example usage in BuiltinFunctionDefinitions

    public static final BuiltInFunctionDefinition PLUS =
            BuiltInFunctionDefinition.newBuilder()
                    .name("plus")
                    .kind(SCALAR)
                    .callSyntax("+", CallSyntax.BINARY_OP)
                    ....

Alternative: Introduce internal functions for all special syntax functions

Instead of introducing the CallSyntax we can have internal versions for all functions requiring special syntax. E.g.

  • for JSON_OBJECT_INTERNAL we may introduce $JSON_OBJECT$(key1, value1, key2, value2, ..., true /* if null on null, absent otherwise */)

  • for POSITION we’d have $POSITION_INTERNAL$(string1, string2)

The downside of the approach is that

  • the SQL is not really user readable, because they must be aware of the existence of the internal alternatives

  • the parsing back from SQL will go through a different path, which might result in subtle bugs