DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP introduce Table API for ML_PREDICT and ML_EVALUATE functions introduced in FLIP-437: Support ML Models in Flink SQL. Related FLIPs are:
- FLIP-437: Support ML Models in Flink SQL
- FLIP-507: Add Model DDL methods in TABLE API
- FLIP-525: Model ML_PREDICT, ML_EVALUATE Implementation Design
Public Interfaces
Add new method to existing TableEnvironment
ModelDescriptor is introduced in FLIP-507: Add Model DDL methods in TABLE API.
interface TableEnvironment {
/** Get model from catalog path. */
Model fromModelPath(String path);
/** Construct model from model descriptor. */
Model from(ModelDescriptor descriptor);
}
Model interface for Table API
/**
* Model object describe model resource definition and key functions to operate on model and table.
*/
interface Model {
/** Return resolved model input schema. */
ResolvedSchema getResolvedInputSchema();
/** Return resolved model output schema. */
ResolvedSchema getResolvedOutputSchema();
/** Returns underlying logical representation of this model. */
QueryOperation getQueryOperation();
/*
* Predict function without runtime options.
*
* Will convert table, input columns to expression for call expression
*/
Table predict(Table table, ColumnList inputColumns);
/**
* Predict function with runtime options. Options can include if it's async run and batch size, timeout etc.
*/
Table predict(Table table, ColumnList inputColumns, Map<String, String> options);
/**
* Evaluate function without runtime options.
*/
Table evaluate(Table table, ColumnList labelColumn, ColumnList inputColumns);
/**
* Evaluate function with runtime options.
*/
Table evaluate(Table table, ColumnList labelColumn, ColumnList inputColumns, Map<String, String> options);
/*
* Create ModelReferenceExpression to use Model as call argument.
*/
ApiExpression asArgument(String name);
}
/*
* Holds ModelSourceQueryOperation. Similar to TableRefereneExpression.
*/
public final class ModelReferenceExpression implements ResolvedExpression {
private final String name;
private final QueryOperation queryOperation;
}
/*
* Holds resolved model. Similar to SourceQueryOperation.
*/
public class ModelSourceQueryOperation implements QueryOperation {
private final ContextResolvedModel contextResolvedModel;
}
Add new method to existing QueryOperationVisitor
public interface QueryOperationVisitor<T> {
T visit(BuiltinFunctionQueryOperation function);
}
Example
To make the ColumnList construction easier. We can also introduce a variable arguments creator for it:
class ColumnLists {
public static ColumnList of(String... names) {
return of(Arrays.asList(names);
}
}
Model myModel = TableEnvironment.fromModelPath("my_model");
Table inputTable = TableEnvironment.from("input_table");
Table evalTable = TableEnvironment.from("evaluate_table");
Table prediction = myModel.predict(inputTable, ColumnList.of("input_col1", "input_col2"));
For(Row result : prediction.execute().collect()) {
System.out.println(result);
}
Table evaluation = myModel.evaluate(evalTable, ColumnList.of("label_col"), ColumList.of("input_col1", "input_col2"));
For(Row result : evaluation.execute().collect()) {
System.out.println(result);
}
Proposed Changes
Introduce
RESOLVE_BUILTIN_CALL_BY_ARGUMENTrule to resolve supported builtin functions such asml_predict,ml_evaluate,tumble,session,cumulate,hoptoBuildInCallExpression.Create
BuiltinFunctionQueryOperationfor builtin functions.Handle in
QueryOperationVisitor,QueryOperationConverterto convertBuiltInFunctionQueryOperationtoLogicalTableFunctionScanRelNode.
Compatibility, Deprecation, and Migration Plan
There's no compatibility issue as these are new APIs.
Test Plan
All existing tests pass
Add new unit tests and integration tests for any new code changes
Rejected Alternatives
NA