DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Apache Flink's ML model functionality is only accessible through FlinkSQL syntax, requiring users to switch between different APIs even when primarily working with the Table API. This limitation creates an inconsistent development experience and unnecessary complexity in ML pipeline implementations.
This proposal aims to extend model DDL support to the Table API, enabling a seamless development experience where users can define, manage, and utilize ML models entirely within the Table API ecosystem. This enhancement will streamline ML workflow development and maintain consistency with Flink's commitment to developer-friendly interfaces.
Public Interfaces
- TableEnvironment
- ModelDescriptor
Proposed Changes
We propose to introduce new ModelDescriptor class within org.apache.flink.table.api package, same as TableDescriptor class and add new APIs in TableEnvironment interface to support Model DDLs in Table API
ModelDescriptor class:
/**
* Describes a CatalogModel representing a model.
*
* A ModelDescriptor is a template for creating a CatalogModel instance. It
* closely resembles the "CREATE MODEL" SQL DDL statement, containing input schema, output schema, and
* other characteristics.
*
* This can be used to register a Model in the Table API.
*/
@PublicEvolving
public class ModelDescriptor {
private final @Nullable Schema inputSchema;
private final @Nullable Schema outputSchema;
private final Map<String, String> modelOptions;
private final @Nullable String comment;
/**
* Returns a map of string-based model options.
*
* @return options of the model.
*/
Map<String, String> getOptions();
/**
* Get the unresolved input schema of the model.
*
* @return unresolved input schema of the model.
*/
Optional<Schema> getInputSchema();
/**
* Get the unresolved output schema of the model.
*
* @return unresolved output schema of the model.
*/
Optional<Schema> getOutputSchema();
/**
* Get comment of the model.
*
* @return comment of the model.
*/
Optional<String> getComment();
/** Creates a new {@link Builder} for the model with the given provider option. */
Builder forProvider(String provider);
/** Converts this {@link ModelDescriptor} immutable instance into a mutable {@link Builder}. */
Builder toBuilder();
/** Builder for {@link ModelDescriptor}. */
@PublicEvolvingBuilder<SELF> {
/** Define the inputSchema of {@link ModelDescriptor}. */
SELF inputSchema(@Nullable Schema inputSchema);
/** Define the outputSchema of {@link ModelDescriptor}. */
SELF outputSchema(@Nullable Schema outputSchema);
/** Define the given option of {@link ModelDescriptor}. */
SELF option(String key, String value);
/** Define the given option of {@link ModelDescriptor} as configOption. */
SELF option(ConfigOption<T> configOption, T value);
/** Define the comment of {@link ModelDescriptor}. */SELF Comment(@Nullable String comment);
/** Returns an immutable instance of {@link ModelDescriptor}. */
SELF ModelDescriptor build();
}
}
Public APIs for model DDL in TableEnvironment.
TableEnvironment class
/**
* Registers a Model API object as a model similar to SQL models.
*
* <p>Temporary objects can shadow permanent ones. If a temporary object in a given path exists,
* the permanent one will be inaccessible in the current session. To make the permanent object
* available again one can drop the corresponding temporary object.
*
* @param path The path under which the model will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param descriptor The descriptor of the model to register.
*/
void createModel(String path, ModelDescriptor descriptor);
/**
* Registers a Model API object as a model similar to SQL models.
*
* <p>Temporary objects can shadow permanent ones. If a temporary object in a given path exists,
* the permanent one will be inaccessible in the current session. To make the permanent object
* available again one can drop the corresponding temporary object.
*
* @param path The path under which the model will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param descriptor The descriptor of the model to register.
* @param ignoreIfExists If a model exists and the given flag is set, no operation is executed.
* An exception is thrown otherwise.
*/
void createModel(String path, ModelDescriptor descriptor, boolean ignoreIfExists);
/**
* Registers a Model API object as a temporary model similar to SQL temporary models.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists,
* it will be inaccessible in the current session. To make the permanent object available again
* one can drop the corresponding temporary object.
*
* @param path The path under which the model will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param descriptor The descriptor of the model to register.
*/
void createTemporaryModel(String path, ModelDescriptor descriptor);
/**
* Registers a Model API object as a temporary model similar to SQL temporary models.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists,
* it will be inaccessible in the current session. To make the permanent object available again
* one can drop the corresponding temporary object.
*
* @param path The path under which the model will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param descriptor The descriptor of the model to register.
* @param ignoreIfExists If a model exists and the given flag is set, no operation is executed.
* An exception is thrown otherwise.
*/
void createTemporaryModel(String path, ModelDescriptor descriptor, boolean ignoreIfExists);
/**
* Gets the names of all models available in the given namespace (the given database of the
* given catalog). It returns both temporary and permanent models.
*
* @return A list of the names of all registered models in the given database of the given
* catalog.
* @see #listTemporaryModels()
*/
String[] listModels();
/**
* Gets the names of all models available in the given namespace (for the provided database of
* the provided catalog). It returns both temporary and permanent models.
*
* @return A list of the names of all registered models in the given database of the given
* catalog.
*/
String[] listModels(String catalogName, String databaseName);
/**
* Gets the names of all temporary Models available in the current namespace (the current
* database of the current catalog).
*
* @return A list of the names of all registered temporary models in the current database of the
* current catalog.
* @see #listModels()
*/
String[] listTemporaryModels();
/**
* Drops a model registered in the given path.
*
* <p>This method can only drop permanent objects. Temporary objects can shadow permanent ones.
* If a temporary object exists in a given path, make sure to drop the temporary object first
* using {@link #dropTemporaryModel}.
*
* @param path The given path under which the model will be dropped. See also the {@link
* TableEnvironment} class description for the format of the path.
*/
boolean dropModel(String path);
/**
* Drops a model registered in the given path.
*
* <p>This method can only drop permanent objects. Temporary objects can shadow permanent ones.
* If a temporary object exists in a given path, make sure to drop the temporary object first
* using {@link #dropTemporaryModel}.
*
* @param path The given path under which the model will be dropped. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param ignoreIfNotExists If false exception will be thrown if the model to drop does not
* exist.
*/
boolean dropModel(String path, boolean ignoreIfNotExists);
/**
* Drops a temporary model registered in the given path.
*
* <p>If a permanent model with a given path exists, it will be used from now on for any queries
* that reference this path.
*
* @param path The given path under which the temporary model will be dropped. See also the
* {@link TableEnvironment} class description for the format of the path.
* @return true if a model existed in the given path and was removed
*/
boolean dropTemporaryModel(String path);
Example Usages for Public APIs:
// Create Model
tableEnv.createModel("mycatalog.mydb.mymodel", ModelDescriptor.forProvider("providerName")
.inputSchema(schema)
// …
.build())
tableEnv.createModel("mycatalog.mydb.mymodel", ModelDescriptor.forProvider("providerName")
.inputSchema(schema)
// …
.build(), false)
tableEnv.createTemporaryModel("mycatalog.mydb.mymodel", ModelDescriptor.forProvider("providerName")
.inputSchema(schema)
// …
.build())
// Drop Model
tableEnv.dropModel("mycatalog.mydb.mymodel")
tableEnv.dropModel("mycatalog.mydb.mymodel", true)
tableEnv.dropTemporaryModel("mycatalog.mydb.mymodel");
// List Model
tableEnv.listModels();
tableEnv.listModels("mycatalog", "mydb");
tableEnv.listTemporaryModels();
As next step we will extend the support for ML functions like ml_predict() and federated_search() in Table API.
Compatibility, Deprecation, and Migration Plan
- No migration required.
Test Plan
- Unit tests/ ITtests
Rejected Alternatives
Completely new feature so no other alternatives.