Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation and Use-cases
The existing Flink ML library allows users to compose an Estimator/Transformer from a pipeline (i.e. linear sequence) of Estimator/Transformer, and each Estimator/Transformer has one input and one output.
The following use-cases are not supported yet. And we would like to address these use-cases with the changes proposed in this FLIP.
1) Express an Estimator/Transformer that has multiple inputs/outputs.
For example, some graph embedding algorithms (e.g., MetaPath2Vec) need to take two tables as inputs. These two tables represent nodes labels and edges of the graph respectively. This logic can be expressed as an Estimator with 2 input tables.
And some workflow may need to split 1 table into 2 tables, and use these tables for training and validation respectively. This logic can be expressed by a Transformer with 1 input table and 2 output tables.
2) Express a generic machine learning computation logic that does not have the "transformation" semantic.
We believe most machine learning engineers associate the name "Transformer" with the "transformation" semantic, where the a record in the output typically corresponds to one record in the input. Thus it is counter-intuitive to use Transformer to encode aggregation logic, where a record in the output corresponds to an arbitrary number of records in the input.
Therefore we need to have a class with a name different from "Transformer" to encode generic multi-input multi-output computation logic.
3) Online learning where a long-running Model instance needs to be continuously updated by the latest model data generated by another long-running Estimator instance.
In this scenario, we need to allow the Estimator to be run on a different machine than the Model, so that the Estimator could consume sufficient computation resource in a cluster while the Model could be deployed on edge devices.
4) Provide APIs to allow Estimator/Model to be efficiently saved/loaded even if state (e.g. model data) of Estimator/Model is more than 10s of GBs.
The existing PipelineStage::toJson basically requires developer of Estimator/Model to serialize all model data into an in-memory string, which could be very inefficient (or practically impossible) if the model data is very large (e.g 10s of GBs).
In addition to addressing the above use-cases, this FLIP also proposes a few more changes to simplify the class hierarchy and improve API usability. The existing Flink ML library has the following usability issues:
5) fit/transform API requires users to explicitly provide the TableEnvironment, where the TableEnvironment could be retrieved from the Table instance given to the fit/transform.
6) A Pipeline is currently both a Transformer and an Estimator. The experience of using Pipeline is inconsistent from the experience of using Estimator (with the needFit API).
This FLIP proposes quite a few changes and additions to the existing Flink ML APIs. We first describe the proposed API additions and changes, followed by the API code of interfaces and classes after making the proposed changes.
API additions and changes
Here we list the additions and the changes to the Flink ML API.
The following changes are the most important changes proposed by this doc:
1) Added the AlgoOperator class. AlgoOperator class has the same interface as the existing Transformer (i.e. provides the transform(...) API).
This change address the need to encode a generic multi-input multi-output machine learning function.
2) Updated fit/transform methods to take list of tables as inputs and return list of tables as output.
This change addresses the use-cases described in the motivation section, e.g. a graph embedding Estimator needs to take 2 tables as inputs.
3) Added setModelData and getModelData to the Model interface.
This change addresses the use-cases described in the motivation section, where a long-running Model instance needs to ingest the model state streams emitted by an Estimator, which could be running on a different machine.
4) Removed the methods PipelineStage::toJson and PipelineStage::loadJson. Added methods save(...) and load(...) to the Stage interface.
This change addresses the need to efficiently save/load a Model instance even if its model data is very large.
The following changes are relatively minor:
5) Removed TableEnvironment from the parameter list of fit/transform APIs.
This change simplifies the usage of fit/transform APIs.
6) Added pipelineModel and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.
This change makes the experience of using Pipeline consistent with the experience of using Estimator.
7) Removed Pipeline::appendStage from the Pipeline class.
8) Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.
Interfaces and classes after the proposed API changes
The following code block shows the interface of Stage, Estimator, Model, Transformer and AlgoOperator, after making the changes listed above.
In this section we provide examples code snippets to demonstrate how we can use the APIs proposed in this FLIP to address the use-cases in the motivation section.
Online learning by running Transformer and Estimator concurrently on different machines
Here is an online learning scenario:
- We have an unbounded stream of tagged data that can be used for training.
- We have an algorithm that can be trained using this unbounded stream of data. This algorithm (with its latest states/parameters) can be used to do inference. And the accuracy of the algorithm increases with the increasing amount of training data it has seen.
- We would like to train this algorithm using the unbounded data stream on clusterA. And uses this algorithm with the update-to-date states/parameters to do inference on 10 different web servers.
In order to address this use-case, we can write the training and inference logics of this algorithm into an EstimatorA class and a ModelA class with the following API behaviors:
- EstimatorA::fit takes a table as input and returns an instance of ModelA. Before fit() returns this ModelA, it calls ModelA.setModelData(model_data), where the model_data represents the stream of algorithm parameters changes emitted by EstimatorA.
- ModelA::setModelData(...) takes a table as input. Its implementation reads the data from this table to continuously update its algorithm parameters.
- ModelA::getModelData(...) returns the same table instance that has been provided via ModelA::setModelData(...).
- ModelA::transform takes a table as input and returns a table. The returned table represents the inference results.
Here are the code snippets that address this use-case by using the proposed APIs.
First run the following code on clusterA:
Then run the following code on each web server:
Compatibility, Deprecation, and Migration Plan
The changes proposed in this FLIP is backward incompatible with the existing APIs. We propose to change the APIs directly without deprecation period. And we will help some existing Flink ecosystem open source projects to migrate and use the proposed APIs.
Note that there is no implementation of Estimator/Transformer (excluding test-only implementations) in the existing Flink codebase. So no work is needed to migrate the existing Flink codebase.
To our best knowledge, the only open source project that uses the Flink ML API is https://github.com/alibaba/Alink. We will work together with Alink developers to migrate the existing code to use the proposed API. Furthermore, we will migrate Alink's Estimator/Transformer implementation to the Flink ML library codebase as much as possible.
We will provide unit tests to validate the proposed changes.
There is no rejected alternatives to be listed here yet.