Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

ML developers spend significant time on data cleaning, preprocessing, ingestion for ML training and inference with two sets of frameworks  (e.g., Spark, Flink for data tasks, Tensorflow, PyTorch for ML tasks). Usually these frameworks are deployed in separate platforms, meaning developers have to rely on external orchestration systems and storage to stitch them into a cohesive workflow. Separating data processing tasks from the ML tasks also adds complexity to change management, data governance and lineage tracking etc. The rapid evolution of AI and GenAI is significantly influencing the data industry, steering it towards a unified streaming data platform architecture for almost all market players.  In fact, ML is essentially another way of extracting insights from data, which logically is no different from the traditional data processing & analytics, but with more intensive computation requirements. Ideally there should be an unified set of APIs to describe the data processing and ML tasks for a more cohesive user experience. As the declarative APIs (SQL) is the common tongue for data processing and analytics, the natural evolution should be to add SQL support for ML tasks.

Public Interfaces

Public interfaces changes include new SQL syntax changes proposed below for model operations as well as new catalog model and catalog changes to operate on models.

Catalog Model (New)

/** Interface for a model in a catalog. */
@PublicEvolving
public interface CatalogModel {

    /**
     * Get the unresolved input schema of the model.
     *
     * @return unresolved input schema of the model.
     */
    Schema getInputSchema();

    /**
     * Get the unresolved output schema of the model.
     *
     * @return unresolved output schema of the model.
     */
    Schema getOutputSchema();

    /**
     * Get comment of the model.
     *
     * @return comment of the model.
     */
    String getComment();

    /**
     * Get a deep copy of the CatalogModel instance.
     *
     * @return a copy of the CatalogModel instance
     */
    CatalogModel copy();

    /**
     * Copy the input model options into the CatalogModel instance.
     *
     * @return a copy of the CatalogModel instance with new model options.
     */
    CatalogModel copy(Map<String, String> modelOptions);
}

Resolved Catalog Model (New)

@PublicEvolving
public interface ResolvedCatalogModel extends CatalogModel {
    /**
     * Returns the original, unresolved metadata object from the {@link Catalog}.
     *
     * <p>This method might be useful if catalog-specific object instances should be directly
     * forwarded from the catalog to a factory.
     */
    CatalogModel getOrigin();

    /** Returns a fully resolved and validated {@link ResolvedSchema} inputSchema. */
    ResolvedSchema getResolvedInputSchema();

    /** Returns a fully resolved and validated {@link ResolvedSchema} outputSchema. */
    ResolvedSchema getResolvedOutputSchema();

    /**
     * Serializes this instance into a map of string-based properties.
     *
     * <p>Compared to the pure table options in {@link #getModelOptions()}, the map includes input
     * schema, output schema, kind, task, comment and options.
     */
    Map<String, String> toProperties();

    /**
     * Creates an instance of {@link CatalogModel} from a map of string properties that were
     * previously created with {@link ResolvedCatalogModel#toProperties()}.
     *
     * @param properties serialized version of a {@link ResolvedCatalogModel} that includes input
     *     schema, output schema, kind, task, comment and options.
     */
    static CatalogModel fromProperties(Map<String, String> properties) {
        return CatalogPropertiesUtil.deserializeCatalogModel(properties);
    }
}

Catalog (Change)

public interface Catalog {
/**
     * Get names of all tables models under this database. An empty list is returned if none exists.
     *
     * @return a list of the names of all models in this database
     * @throws DatabaseNotExistException if the database does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default List<String> listModels(String databaseName)
            throws DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException(
                String.format("listModel(String) is not implemented for %s.", this.getClass()));
    }

    /**
     * Returns a {@link CatalogModel} identified by the given {@link ObjectPath}.
     *
     * @param modelPath Path of the model
     * @return The requested model
     * @throws ModelNotExistException if the target does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default CatalogModel getModel(ObjectPath modelPath)
            throws ModelNotExistException, CatalogException {
        throw new UnsupportedOperationException(
                String.format("getModel(ObjectPath) is not implemented for %s.", this.getClass()));
    }

    /**
     * Check if a model exists in this catalog.
     *
     * @param modelPath Path of the model
     * @return true if the given model exists in the catalog false otherwise
     * @throws CatalogException in case of any runtime exception
     */
    default boolean modelExists(ObjectPath modelPath) throws CatalogException {
        throw new UnsupportedOperationException(
                String.format(
                        "modelExists(ObjectPath) is not implemented for %s.", this.getClass()));
    }

    /**
     * Drop a model.
     *
     * @param modelPath Path of the model to be dropped
     * @param ignoreIfNotExists Flag to specify behavior when the model does not exist: if set to
     *     false, throw an exception, if set to true, do nothing.
     * @throws ModelNotExistException if the model does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void dropModel(ObjectPath modelPath, boolean ignoreIfNotExists)
            throws ModelNotExistException, CatalogException {
        throw new UnsupportedOperationException(
                String.format(
                        "dropModel(ObjectPath, boolean) is not implemented for %s.",
                        this.getClass()));
    }

    /**
     * Rename an existing model.
     *
     * @param modelPath Path of the model to be renamed
     * @param newModelName the new name of the model
     * @param ignoreIfNotExists Flag to specify behavior when the model does not exist: if set to
     *     false, throw an exception, if set to true, do nothing.
     * @throws ModelNotExistException if the model does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void renameModel(ObjectPath modelPath, String newModelName, boolean ignoreIfNotExists)
            throws ModelNotExistException, ModelAlreadyExistException, CatalogException {
        throw new UnsupportedOperationException(
                String.format(
                        "renameModel(ObjectPath, String, boolean) is not implemented for %s.",
                        this.getClass()));
    }

    /**
     * Creates a new model.
     *
     * @param modelPath path of the model to be created
     * @param model the CatalogModel definition
     * @param ignoreIfExists flag to specify behavior when a model already exists at the given path:
     *     if set to false, it throws a ModelAlreadyExistException, if set to true, do nothing.
     * @throws ModelAlreadyExistException if model already exists and ignoreIfExists is false
     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default void createModel(ObjectPath modelPath, CatalogModel model, boolean ignoreIfExists)
            throws ModelAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException(
                String.format(
                        "createModel(ObjectPath, CatalogModel, boolean) is not implemented for %s.",
                        this.getClass()));
    }

    /**
     * Modifies an existing model. Note that the new and old {@link CatalogModel} must be of the
     * same kind. For example, this doesn't allow altering a remote model to import model or native
     * model, and vice versa.
     *
     * @param modelPath path of the model to be modified
     * @param newModel the new CatalogModel definition
     * @param ignoreIfNotExists flag to specify behavior when the model does not exist: if set to
     *     false, throw an exception, if set to true, do nothing.
     * @throws ModelNotExistException if the model does not exist
     * @throws CatalogException in case of any runtime exception
     */
    default void alterModel(ObjectPath modelPath, CatalogModel newModel, boolean ignoreIfNotExists)
            throws ModelNotExistException, CatalogException {
        throw new UnsupportedOperationException(
                String.format(
                        "alterModel(ObjectPath, CatalogModel, boolean) is not implemented for %s.",
                        this.getClass()));
    }
}



Proposed Changes

Model Resource

We propose to introduce ML Models as 1st class citizens into Flink SQL and the corresponding catalog, similar to Table, following the same resource hierarchy (catalog / database / model).  A model resource can be described with CatalogModel class with the following properties:

  • ModelKind (Enum): internal or external model

  • ModelTask (Enum): regression, classification, clustering, generation, etc.

  • ModelOptions (Map<String, String>): string-based model specific options, including algorithm hyper parameters, inference runtime parameters, etc. Mandatory options include 

    • ‘Task’: which describes the kind of tasks the model is doing such as classification, text generation etc.

    • ‘Provider’: which describe the provider for external models such as openai etc.

  • InputSchema (Schema): expected model  input column schemas.

  • OutputSchema (Schema): expected model output column schemas.

  • Comment (String): comment of the model

A CatalogModel object can be resolved into a ResolvedCatalogModel object with resolved input and output schemas. A serializer (#toProperties) and deserializer (#fromProperties) should also be implemented for ResolvedCatalogModel for Flink Jobs with model invoking. 

SQL Syntax

We also propose to support the corresponding DDL and other Syntax in Flink SQL for Model lifecycle management (create / update / delete / get / list) as follows 

# Create Model DDL Syntax

CREATE MODEL [IF NOT EXISTS] [catalog_name.][db_name.]model_name
[INPUT (input_column_list)]
[OUTPUT (outpt_column_list)]
[COMMENT model_comment]
WITH(model_option_list)
[AS query_statement]

#Example I: import a model from customer storage.

CREATE MODEL `my_import_model`
INPUT (f1 INT, f2 STRING)
OUTPUT (label FLOAT)
WITH(
  'task' = 'regression',
  'type' = 'import',
  'format' = 'ONNX',
  'ONNX.path' = 'http://storage.googleapis.com/storage/t.onnx',
)

#Example II: reference a remote model from customer endpoint.

CREATE MODEL `my_remote_model`
INPUT (f1 INT, f2 STRING)
OUTPUT (label STRING, probs ARRAY<FLOAT>)
WITH(
  'task' = 'classification',
  'type' = 'remote',
  'provider' = 'openai',
  'openai.endpoint' = 'https://api.openai.com/v1/llm/v1/chat',
  'openai.api_key' = 'abcdefg'
)

#Example III: train a kmeans clustering model in Flink.

CREATE MODEL `my_native_model`
WITH(
  'task' = 'clustering',
  'type' = 'training',
  'algorithm' = 'kmeans',
  'kmeans.num_clusters' = '3'
) AS SELECT F1, F2 FROM `my_data`
  • INPUT and OUTPUT list to specify the model signature (input and output schema) if it is not self described.

  • CREATE TEMPORARY MODEL is also supported.

  • Model_option_list is a key-value pair list to specify the model training & inference and other information, including model task, algorithm, training & tuning parameters, runtime optimization parameters and resource management options (version, tag, label, description) etc..

  • The AS query_statement is used to provide the training data.


# Alter Model DDL Syntax

# rename model syntax
ALTER MODEL [IF EXISTS][catalog_name.][database_name.]model_name
RENAME TO [catalog_name.][database_name.]new_model_name

# Alter model options syntax
ALTER MODEL [IF EXISTS] [catalog_name.][database_name.]model_name
SET (key1=val1[, key2=val2]...)

# Example I: rename model
ALTER MODEL `my_model` RENAME TO `my_new_model`
ALTER MODEL IF EXISTS `my_model` RENAME TO `my_new_model`

# Example II: alter model options
ALTER MODEL `my_model` SET (
   tag = 'prod', 
   description = "new_description"
)
  • Alter model options only apply to model metadata but not model data.

  • If IF EXISTS is provided and the model doesn’t exist, nothing happens.

  • if IF EXISTS is provided and the model version doesn’t exist, nothing happens.


# Drop Model DDL Syntax

DROP MODEL [IF EXISTS] [catalog_name.][db_name.]model_name

#Example I: drop model

DROP MODEL `my_model`
DROP MODEL IF EXISTS `my_model`
  • If IF EXISTS is provided and the model or version doesn’t exist, nothing happens.


# Show Models Syntax

SHOW MODELS [ ( FROM | IN ) [catalog_name.]database_name ]
[ [NOT] LIKE <sql_like_pattern> ]
  • Show all models in a catalog / database. 

  • A pattern can also be provided to filter models.


# Describe Model Syntax

{ DESCRIBE | DESC } MODEL [catalog_name.][database_name.]model_name
  • Describe a model to show input/output schemas


# Show create model SHOW CREATE MODEL [catalog_name.][database_name.]model_name

  • Show all model information including properties, comments etc

Model Functions

Once a ML model is created in the Flink SQL catalog, we can run streaming (batch) prediction or evaluation against new data with the model. We propose to have the table value function and table aggregation function to support model predictions and evaluations respectively in Flink SQL jobs.  Here are the query examples:


# Model Prediction

SELECT f1, f2, label FROM ML_PREDICT(`my_data`, `classifier_model`, DESCRIPTOR(f1, f2))

# Prediction with named arguments

SELECT f1, f2, label FROM 
ML_PREDICT(
  input => `my_data`, 
  model => `classifier_model`, 
  args => DESCRIPTOR(f1, f2)
)

# Model Evaluation

SELECT * FROM ML_EVALUATE(`eval_data`, `classifier_model`, DESCRIPTOR(f1, f2))

# Evaluation with named arguments

SELECT * FROM ML_EVALUATE(
  input => `eval_data`, 
  model => `classifier_model`, 
  args => DESCRIPTOR(f1, f2)
)

We use polymorphic table function which take a whole table and model as input even if support user-defined ones are not yet available in Flink.

Note that this FLIP defines how Polymorphic Table Functions (PTF) syntax should be defined for Flink SQL going forward. Even before FLIP-440 is implemented, we introduce the concise Oracle-inspired syntax for built-in functions. Meaning we are dropping TABLE(...)  by newest additions to Calcite and use (expanded) SqlIdentifier for declaring tables and models.

Meanwhile, unlike other table or table aggregation functions, the function signature is determined by the underlying model invoked, which means the functions can not be resolved or constructed based on the function name (which is the default way in Flink). One way to address this issue is to introduce a specialized user defined function interface (called ModelFunction), which holds the catalog model information during the job lifecycle.

Built-in ML Provider Support

For remote model providers, we can start with popular vendors such as OpenAI, AzureML, AWS SageMaker.


Compatibility, Deprecation, and Migration Plan

  • The new PTF syntax is only an additional alternative, we keep the old PTF syntax as an "extended syntax" officially supported. It's not deprecated.

Test Plan

  • All existing tests pass
  • Add new unit tests and integration tests for any new code changes

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.