...
There are three kinds of user-defined functions: scalar function, table function. The interfaces are as follows.
UserDefinedFunction
class UserDefinedFunction(object): """ Base interface for Python user-defined function. """ __metaclass__ = ABCMeta def open(self, function_context): pass def close(self): pass def is_deterministic(self): return True |
FunctionContext
class FunctionContext(object): def get_metric_group(self): return MetricGroup() @abstractmethod def get_job_parameter(self, key, default_value): pass |
ScalarFunction
class ScalarFunction(UserDefinedFunction): @abstractmethod def eval(self, *args): """Please define your implementation""" pass |
TableFunction
class TableFunction(UserDefinedFunction): @abstractmethod def eval(self, *args): """Please define your implementation""" pass |
Metrics
User-defined functions may need to report metrics during execution. The following interfaces will be proposed to support it.
class MetricGroup(object): def __init__(self, group_name): self.group_name = group_name def counter(self, name): return Counter(MetricName(group_name, name)) def gauge(self, name, value): return Guava(MetricName(group_name, name)) def histogram(self, name): return Histogram(MetricName(group_name, name)) def meter(self, name): return Meter(MetricName(group_name, name)) class MetricName(object): def __init__(self, group_name, name): self.group_name = group_name self.name = name class Counter(object): def __init__(self, metric_name): self.metric_name = metric_name self.value = 0 def inc(self, n=1): self.value += n def dec(self, n=1): self.value -= n class Guava(object): def __init__(self, metric_name): self.metric_name = metric_name def set(self, value): self.value = value class Histogram(object): def __init__(self, metric_name): self.metric_name = metric_name def update(self, value): self.value = value class Meter(object): def __init__(self, metric_name): self.metric_name = metric_name self.value = 0 def mark_event(n): self.value += n |
Decorators
We will add the following decorators to declare the input types, the output types and the accumulator types of user-defined functions:
...
Decorator arguments:
- input_types: used in ScalarFunction, TableFunction, indicates the input type of the user defined function.
- outputresult_type: used in ScalarFunction, indicates the result type of the user defined scalar function.
- result_types: used in TableFunction, indicates the result type types of the user defined table function.
Examples
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], |
result_type=DataTypes.BIGINT()) class Multiply(ScalarFunction): def eval(self, x, y): return x * y def is_deterministic(self): return False
|
result_types=DataTypes.STRING()) class Split(TableFunction): def eval(self, x): for e in x.split(“#”): yield e class SumAcc(object): def __init__(self) self.sum = 0 |
Besides, decorators can also be used to declare a Python function as UDF or UDTF:
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], |
result_type=DataTypes.BIGINT(), deterministic=False) def multiply(x, y): return x * y @udtf(input_types=[DataTypes.STRING()], |
result_types=DataTypes.STRING()) def split(self, x): for e in x.split(“#”): yield e |
Proposed Design
Registration of Python User-Defined Functions
Firstly, we need to allow the Python user-defined functions could be registered. The following interfaces will be added to the Python TableEnvironment:
class TableEnvironment(object):
... def register_function(self, name, function): pass |
Underlying, the corresponding Java instance of ScalarFunction, TableFunction will be created and registered into the TableEnvironment. To distinguish the Python user-defined functions from the Java user-defined functions, the method getLanguage will be introduced to Java interface FunctionDefinition:
public interface FunctionDefinition { … default FunctionLanguage getLanguage() { return FunctionLanguage.JAVA; } } public enum FunctionLanguage { JVM, PYTHON } |
Examples
class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j t_env.register_function("add_one", udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())) t_env.register_function("subtract_one", udf(SubtractOne(), DataTypes.BIGINT(),DataTypes.BIGINT())) t_env.register_function("add", add) |
High-Level Execution Mode
...
Regarding the following job:
table.scan("source") .alias("a, b, c, d") .select("a, py_udf1(b), py_udf2(java_udf1(c)), java_udf2(py_udf3(d))") |
The physical plan will be adjusted as follows:
...
Suppose the following job:
table.scan("source") .alias("a, b") .join_lateral("py_udtf1(a) as (x, y)") .join_lateral("py_udtf2(b, x) as (m, n)") |
The table function of the merged Correlate of the above job is as follows:
MergedTableFunction(TableFunction): def eval(self, a, b): for x, y in py_udtf1(a): for m, n in py_udtf2(b, x): yield x, y, m, n |
User-Defined Function Execution
...
Interfaces JobBundleFactory and StageBundleFactory provided in Beam could be used to prepare the user-defined function execution environment. The interfaces are as follows:
/** * A factory that has all job-scoped information, and can be combined with stage-scoped information * to create a {@link StageBundleFactory}. * * <p>Releases all job-scoped resources when closed. */ public interface JobBundleFactory extends AutoCloseable { StageBundleFactory forStage(ExecutableStage executableStage); } /** * A bundle factory scoped to a particular {@link * org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the resources * it needs to provide new {@link RemoteBundle RemoteBundles}. * * <p>Closing a StageBundleFactory signals that the stage has completed and any resources bound to * its lifetime can be cleaned up. */ public interface StageBundleFactory extends AutoCloseable { /** Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. */ RemoteBundle getBundle( OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) throws Exception; ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor(); } |
PS: The default implementation DefaultJobBundleFactory and SimpleStageBundleFactory could be good candidates for how to implement JobBundleFactory and StageBundleFactory.
...
- The Python worker packs the collection of results corresponding to one input row into a list and send them together to PythonUserDefinedFunctionRunner. If zero results is generated for one input row, an empty list will be returned.
- Adds a header to the original result: (header, result). The header represents whether the results corresponding to the current input is finished. The Python worker send the composed results to PythonUserDefinedFunctionRunner and there is no need to pack the results.
Pros | Cons | |
Solution 1 | Implementation is simple | OOM is possible if the number of results for one input is very big |
Solution 2 | There is no need to worry about the number of results for one input |
|
Solution 1 is preferred for now as no extra header is needed. If the number of table function results for one input is so big that OOM is caused, users should increase the heap memory or optimizing the table function.
...
We can see from the interface StageBundleFactory that a BundleProgressHandler needs to be provided during the execution environment initialization.
public interface BundleProgressHandler { /** Handles a progress report from the bundle while it is executing. */ void onProgress(ProcessBundleProgressResponse progress); /** Handles the bundle's completion report. */ void onCompleted(ProcessBundleResponse response); /** Returns a handler that ignores metrics. */ static BundleProgressHandler ignored() { return new BundleProgressHandler() { @Override public void onProgress(ProcessBundleProgressResponse progress) {} @Override public void onCompleted(ProcessBundleResponse response) {} }; } } |
The BundleProgressHandler defines the callback of job execution progress. The data structure ProcessBundleProgressResponse and ProcessBundleResponse contain the metrics information collected in the remote environment.
...