Motivation

The motivation of the FLIP is to further extend the range of operations and expressions that are SQL serializable. In the past there’s been a number of FLIPs that made certain parts of the Table API expressable in SQL strings:

There are still a few expressions that are not SQL serializable, because they require customization, often platform specific, e.g. inline function calls:

env.fromValues(1).select(call(ConvertToEmailUDF.class, "ABC"));

This would be the next step in a north star vision of being able to express all Table API constructs with SQL which would let us get rid of Table API <> RelNode, RexNode conversions such as e.g.

Public Interfaces

@PublicEvolving
public interface QueryOperation extends Operation {
    default String asSerializableString(SqlSerializationContext context);
}
package org.apache.flink.table.operations;

@PublicEvolving
public interface SqlSerializationContext {
  String serializeInlineFunction(FunctionDefinition functionDefinition);
  
  // this can be further extended in the future
}


A separate context for serializing ResolvedExpression so that it can be independently modified. Most likely expression.SqlSerializationContext will always be a subset of operation.SqlSerializationContext


@PublicEvolving
public interface ResolvedExpression extends Expression {
    default String asSerializableString(SqlSerializationContext context);
}
package org.apache.flink.table.expressions;

public interface SqlSerializationContext {
    String serializeInlineFunction(FunctionDefinition functionDefinition);

    // this can be further extended in the future
}

We need to update objects that serialize expressions as part of computed columns:

@PublicEvolving
public final class ResolvedCatalogTable
        implements ResolvedCatalogBaseTable<CatalogTable>, CatalogTable {
    public Map<String, String> toProperties(SqlSerializationContext context);
}
@PublicEvolving
public interface ResolvedCatalogModel extends CatalogModel {
    Map<String, String> toProperties(SqlSerializationContext context);
}

Lastly, we need to have a way to pass a custom implementation of SqlSerializationContext. Since the class is not string serializable and thus cannot be put into ReadableConfig I suggest putting it into EnvironmentSettings similar which can be later passed to CatalogManager.

    /** A builder for {@link EnvironmentSettings}. */
    @PublicEvolving
    public static class Builder {
        ....
    
        public Builder withSqlSerializationContext(SqlSerializationContext sqlSerializationContext) {
            this.sqlSerializationContext = sqlSerializationContext;
            return this;
        }
    }

Proposed Changes

With the changes to public interfaces listed above one can implement a custom logic for dealing with inline functions. One example is that platforms can register inline functions under a unique special identifier that is automatically generated and use that in the generated SQL string.

Example usage:

TableEnvironment env = TableEnvironment.create(
    EnvironmentSettings.newInstance()
                       .inStreamingMode()
                       .withSqlSerializationContext(new SerializationContext() {
       @Override
       public String serializeInlineFunction(FunctionDefinition functionDefinition) {
           // register and generate an identifier
           ...
       }
   })
)


// the MyUDF can be handled by the platform and be string serializable
env.from("t").select(call(MyUDF.class, $("f0"))).execute().print();

Compatibility, Deprecation, and Migration Plan

Code paths that deal with deprecated classes won’t be updated, e.g. paths that still use TableSchema. Those paths won’t support the customization and will behave the same way as they do now.

Test Plan

Unit tests on affected classes and maybe one ITCase should be sufficient.

Rejected Alternatives

None