Discussion threadhttps://lists.apache.org/thread/hlmm9l6qr3pdhs8oc8ltn4pf0s6tg3x4
Vote threadhttps://lists.apache.org/thread/by9mpnmk22nws1sk54z7t69802ws2z55
JIRA

FLINK-38104 - Getting issue details... STATUS

Current StatusAccepted
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP introduce Table API for ML_PREDICT  and ML_EVALUATE  functions introduced in FLIP-437: Support ML Models in Flink SQL. Related FLIPs are:

Public Interfaces

Add new method to existing TableEnvironment

ModelDescriptor  is introduced in FLIP-507: Add Model DDL methods in TABLE API.

interface TableEnvironment {
	/** Get model from catalog path. */
    Model fromModelPath(String path);

	/** Construct model from model descriptor. */
    Model from(ModelDescriptor descriptor);
}

Model interface for Table API

/**
 * Model object describe model resource definition and key functions to operate on model and table.
 */
interface Model {
	/** Return resolved model input schema. */
    ResolvedSchema getResolvedInputSchema();

	/** Return resolved model output schema. */
    ResolvedSchema getResolvedOutputSchema();

    /** Returns underlying logical representation of this model. */
    QueryOperation getQueryOperation();

    /*
	 * Predict function without runtime options.
     *
     * Will convert table, input columns to expression for call expression
     */
    Table predict(Table table, ColumnList inputColumns);

	/**
     * Predict function with runtime options. Options can include if it's async run and batch size, timeout etc.
	 */
    Table predict(Table table, ColumnList inputColumns, Map<String, String> options);

	/**
	 * Evaluate function without runtime options.
	 */
    Table evaluate(Table table, ColumnList labelColumn, ColumnList inputColumns);

	/**
	 * Evaluate function with runtime options.
	 */
    Table evaluate(Table table, ColumnList labelColumn, ColumnList inputColumns, Map<String, String> options);

    /*
     * Create ModelReferenceExpression to use Model as call argument.
     */
    ApiExpression asArgument(String name);
}
/*
 * Holds ModelSourceQueryOperation. Similar to TableRefereneExpression.
 */
public final class ModelReferenceExpression implements ResolvedExpression {
    private final String name;
    private final QueryOperation queryOperation;
}
/*
 * Holds resolved model. Similar to SourceQueryOperation.
 */
public class ModelSourceQueryOperation implements QueryOperation {
    private final ContextResolvedModel contextResolvedModel;
}

Add new method to existing QueryOperationVisitor 

public interface QueryOperationVisitor<T> {
    T visit(BuiltinFunctionQueryOperation function);
}

Example

To make the ColumnList  construction easier. We can also introduce a variable arguments creator for it:

class ColumnLists {
    public static ColumnList of(String... names) {
        return of(Arrays.asList(names);
    }
}


Model myModel = TableEnvironment.fromModelPath("my_model");
Table inputTable = TableEnvironment.from("input_table");
Table evalTable = TableEnvironment.from("evaluate_table");

Table prediction = myModel.predict(inputTable, ColumnList.of("input_col1", "input_col2"));
For(Row result : prediction.execute().collect()) {
	System.out.println(result);
}

Table evaluation = myModel.evaluate(evalTable, ColumnList.of("label_col"), ColumList.of("input_col1", "input_col2"));
For(Row result : evaluation.execute().collect()) {
	System.out.println(result);
}

Proposed Changes

  • Introduce RESOLVE_BUILTIN_CALL_BY_ARGUMENT rule to resolve supported builtin functions such as ml_predict, ml_evaluate, tumble, session, cumulate, hop to BuildInCallExpression.

  • Create BuiltinFunctionQueryOperation for builtin functions.

  • Handle in QueryOperationVisitor , QueryOperationConverter to convert BuiltInFunctionQueryOperation to LogicalTableFunctionScan RelNode.

Compatibility, Deprecation, and Migration Plan

There's no compatibility issue as these are new APIs.

Test Plan

  • All existing tests pass

  • Add new unit tests and integration tests for any new code changes

Rejected Alternatives

NA