Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are three kinds of user-defined functions: scalar function, table function. The interfaces are as follows.

UserDefinedFunction

class UserDefinedFunction(object):

   """

   Base interface for Python user-defined function.

   """

   __metaclass__ = ABCMeta

   def open(self, function_context):

       pass

   def close(self):

       pass

   def is_deterministic(self):

       return True

FunctionContext

class FunctionContext(object):

   def get_metric_group(self):

       return MetricGroup()

   @abstractmethod

   def get_job_parameter(self, key, default_value):

       pass

ScalarFunction

class ScalarFunction(UserDefinedFunction):

   @abstractmethod

   def eval(self, *args):

       """Please define your implementation"""

       pass

TableFunction

class TableFunction(UserDefinedFunction):

   @abstractmethod

   def eval(self, *args):

       """Please define your implementation"""

       pass

Metrics

User-defined functions may need to report metrics during execution. The following interfaces will be proposed to support it. 

class MetricGroup(object):

   def __init__(self, group_name):

      self.group_name = group_name

   def counter(self, name):

       return Counter(MetricName(group_name, name))

   def gauge(self, name, value):

       return Guava(MetricName(group_name, name))

   def histogram(self, name):

       return Histogram(MetricName(group_name, name))

   def meter(self, name):

       return Meter(MetricName(group_name, name))

class MetricName(object):

  def __init__(self, group_name, name):

   self.group_name = group_name

  self.name = name

class Counter(object):

  def __init__(self, metric_name):

      self.metric_name = metric_name

      self.value = 0

   def inc(self, n=1):

       self.value += n

   def dec(self, n=1):

       self.value -= n

class Guava(object):

   def __init__(self, metric_name):

      self.metric_name = metric_name

   def set(self, value):

      self.value = value

class Histogram(object):

   def __init__(self, metric_name):

      self.metric_name = metric_name

   def update(self, value):

      self.value = value

class Meter(object):

   def __init__(self, metric_name):

      self.metric_name = metric_name

      self.value = 0

   def mark_event(n):

      self.value += n

Decorators

We will add the following decorators to declare the input types, the output types and the accumulator types of user-defined functions: 

...

Decorator arguments:

  • input_types: used in ScalarFunction, TableFunction, indicates the input type of the user defined function.
  • outputresult_type: used in ScalarFunction, indicates the result type of the user defined scalar function.
  • result_types: used in TableFunction, indicates the result type types of the user defined table function.

Examples

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],

output

result_type=DataTypes.BIGINT())

class Multiply(ScalarFunction):

   def eval(self, x, y):

       return x * y


  def is_deterministic(self):

        return False


@udtf(
input_types=[DataTypes.STRING()],

output

result_types=DataTypes.STRING())

class Split(TableFunction):

   def eval(self, x):

       for e in x.split(“#”):

           yield e

class SumAcc(object):

   def __init__(self)

       self.sum = 0


Besides, decorators can also be used to declare a Python function as UDF or UDTF:

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],

output

result_type=DataTypes.BIGINT(), deterministic=False)

def multiply(x, y):

    return x * y

@udtf(input_types=[DataTypes.STRING()],

output

result_types=DataTypes.STRING())

def split(self, x):

    for e in x.split(“#”):

        yield e


Proposed Design

Registration of Python User-Defined Functions

Firstly, we need to allow the Python user-defined functions could be registered. The following interfaces will be added to the Python TableEnvironment:

class TableEnvironment(object):

   

   ...

   def register_function(self, name, function):

       pass


Underlying, the corresponding Java instance of ScalarFunction, TableFunction will be created and registered into the TableEnvironment. To distinguish the Python user-defined functions from the Java user-defined functions, the method getLanguage will be introduced to Java interface FunctionDefinition:

public interface FunctionDefinition {

  …

  default FunctionLanguage getLanguage() {

    return FunctionLanguage.JAVA;

  }

}

public enum FunctionLanguage {

  JVM,

  PYTHON

}

Examples

class SubtractOne(ScalarFunction):

      def eval(self, i):

            return i - 1


@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())

def add(i, j):

      return i + j


t_env.register_function("add_one",

                                        udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()))

t_env.register_function("subtract_one",

                                        udf(SubtractOne(), DataTypes.BIGINT(),DataTypes.BIGINT()))

t_env.register_function("add", add)

High-Level Execution Mode

...

Regarding the following job:

table.scan("source")

        .alias("a, b, c, d")

        .select("a, py_udf1(b), py_udf2(java_udf1(c)), java_udf2(py_udf3(d))")


The physical plan will be adjusted as follows:

...

Suppose the following job:

table.scan("source")

     .alias("a, b")

     .join_lateral("py_udtf1(a) as (x, y)")

     .join_lateral("py_udtf2(b, x) as (m, n)")


The table function of the merged Correlate of the above job is as follows:

MergedTableFunction(TableFunction):

    def eval(self, a, b):

        for x, y in py_udtf1(a):

            for m, n in py_udtf2(b, x):

                yield x, y, m, n

User-Defined Function Execution

...

Interfaces JobBundleFactory and StageBundleFactory provided in Beam could be used to prepare the user-defined function execution environment. The interfaces are as follows:

/**

* A factory that has all job-scoped information, and can be combined with stage-scoped information

* to create a {@link StageBundleFactory}.

*

* <p>Releases all job-scoped resources when closed.

*/

public interface JobBundleFactory extends AutoCloseable {

 StageBundleFactory forStage(ExecutableStage executableStage);

}

/**

* A bundle factory scoped to a particular {@link

* org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the resources

* it needs to provide new {@link RemoteBundle RemoteBundles}.

*

* <p>Closing a StageBundleFactory signals that the stage has completed and any resources bound to

* its lifetime can be cleaned up.

*/

public interface StageBundleFactory extends AutoCloseable {

 /** Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. */

 RemoteBundle getBundle(

     OutputReceiverFactory outputReceiverFactory,

     StateRequestHandler stateRequestHandler,

     BundleProgressHandler progressHandler)

     throws Exception;

 ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor();

}


PS: The default implementation DefaultJobBundleFactory and SimpleStageBundleFactory could be good candidates for how to implement JobBundleFactory and StageBundleFactory.

...

  1. The Python worker packs the collection of results corresponding to one input row into a list and send them together to PythonUserDefinedFunctionRunner. If zero results is generated for one input row, an empty list will be returned.
  2. Adds a header to the original result: (header, result). The header represents whether the results corresponding to the current input is finished. The Python worker send the composed results to PythonUserDefinedFunctionRunner and there is no need to pack the results.



Pros

Cons

Solution 1

Implementation is simple

OOM is possible if the number of results for one input is very big

Solution 2

There is no need to worry about the number of results for one input

  1. Extra header is needed


Solution 1 is preferred for now as no extra header is needed. If the number of table function results for one input is so big that OOM is caused, users should increase the heap memory or optimizing the table function.

...

We can see from the interface StageBundleFactory that a BundleProgressHandler needs to be provided during the execution environment initialization.

public interface BundleProgressHandler {

 /** Handles a progress report from the bundle while it is executing. */

 void onProgress(ProcessBundleProgressResponse progress);

 /** Handles the bundle's completion report. */

 void onCompleted(ProcessBundleResponse response);

 /** Returns a handler that ignores metrics. */

 static BundleProgressHandler ignored() {

   return new BundleProgressHandler() {

     @Override

     public void onProgress(ProcessBundleProgressResponse progress) {}

     @Override

     public void onCompleted(ProcessBundleResponse response) {}

   };

 }

}


The BundleProgressHandler defines the callback of job execution progress. The data structure ProcessBundleProgressResponse and ProcessBundleResponse contain the metrics information collected in the remote environment.

...