Status

Discussion thread
Vote thread
JIRA

FLINK-16670 - Getting issue details... STATUS

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-58 adds the support for Python UDFs, but user-defined metrics have not been supported yet. With metrics, users can report and monitor the UDF status to get a deeper understanding of the execution. In this FLIP, we want to support metrics for Python UDFs.

Goal

  • Support user-defined metrics including Counters, Gauges, Meters, Distributions in Python UDFs. (Note: Histogram is not supported in this FLIP, instead, Distributions is supported to report statistics about the distribution of value. See more in the Distribution section.)
  • Support defining user scopes.
  • Support defining user variables.

Architecture

The high-level workflow could be summarized as follows:

  1. During initialization(in the open method), the base metric group information will be sent to the Python Operator. The information includes metric group variables, metric components and delimiter which is used to construct the metric identifiers. 
  2. Based on the metric group information, we can reconstruct the base metric group and do metric registrations on the Python side.
  3. Process input elements.
  4. Update metrics in the Python UDFs.
  5. Transmit metrics from Python to Java on finishBundle(). Currently, we have a time and a size limit for each bundle. Since the time limit is usually not too big(default 1s), we can leverage the bundle to transmit metrics from Python to Java. This also brings performance benefits since we don’t have to report metrics for each record. 
  6. Update the metric on the Java side.

Public Interfaces

Registering metrics

You can access the metric system from any Python UDF that extends `pyflink.table.udf.UserDefinedFunction` in the open method by calling function_context.get_metric_group(). This method returns a MetricGroup object on which you can create and register new metrics.

def open(self, function_context):

    super().open(function_context)

    self.counter = function_context.get_metric_group().counter("my_counter")

Metric Types

In this FLIP, we are going to support Counters, Gauges, Meters and Distributions. More details below.

Counter

A Counter is used to count something. The current value can be in- or decreased using inc()/inc(n: int) or dec()/dec(n: int). You can create and register a Counter by calling counter(name: str) on a MetricGroup.

class Counter(Metric):

    """Counter metric interface. Allows a count to be incremented/decremented

    during pipeline execution."""

    def __init__(self, inner_counter):

        self._inner_counter = inner_counter

    def inc(self, n=1):

        self._inner_counter.inc(n)

    def dec(self, n=1):

        self.inc(-n)


class MyUDF(ScalarFunction):

    def __init__(self):

        self.counter = None

    def open(self, function_context):

        super().open(function_context)

        self.counter = function_context.get_metric_group().counter("my_counter")

    def eval(self, i):

        self.counter.inc(3)

        return i - 1

Gauge

A metric that reports the latest value out of reported values. You can register a gauge by calling gauge(name: str, obj: Callable[[], int]) on a MetricGroup. The Callable object will be used to report the values. Gauge metrics are restricted to integer-only values.

class MyUDF(ScalarFunction):

    def __init__(self):

        self.length = 0

    def open(self, function_context):

        super().open(function_context)

        function_context.get_metric_group().gauge("my_gauge", lambda : self.length)

    def eval(self, i):

        self.length = i

        return i - 1

Meter

A Meter measures an average throughput. An occurrence of an event can be registered with the mark_event() method. The occurrence of multiple events at the same time can be registered with mark_event(n: long) method. You can register a meter by calling meter(name: str, time_span_in_seconds) on a MetricGroup. The default value of time_span_in_seconds is 60.

class Meter(Metric):

    """Meter Metric interface.

    Metric for measuring throughput."""

    def __init__(self, inner_counter, time_span_in_seconds=60):

        self._inner_counter = inner_counter

        self._time_span_in_seconds = time_span_in_seconds

    def mark_event(self, value=1):

        self._inner_counter.inc(value)

    def get_time_span_in_seconds(self):

        return self._time_span_in_seconds


class MyUDF(ScalarFunction):

    def __init__(self):

        self.meter = None

    def open(self, function_context):

        super().open(function_context)

        # an average rate of events per second over 120s, default is 60s.

        self.meter = function_context.get_metric_group().meter("my_meter", time_span_in_seconds=120)

    def eval(self, i):

        self.meter.mark_event(1)

        return i - 1


Distribution

A metric that reports information(sum/count/min/max/mean) about the distribution of reported values. The value can be updated using update(n: integer). You can register a distribution by calling distribution(name: str) on a MetricGroup. Distribution metrics are restricted to integer-only distributions. Internally, the distribution is implemented by the Flink Gauge as it can report a value of any type.

class Distribution(Metric):

    """Distribution Metric interface.

    Allows statistics about the distribution of a variable to be collected during

    pipeline execution."""

    def __init__(self, inner_distribution):

        self._inner_distribution = inner_distribution

    def update(self, value):

        self._inner_distribution.update(value)


class MyUDF(ScalarFunction):

    def __init__(self):

        self.distribution = None

    def open(self, function_context):

        super().open(function_context)

        self.distribution = function_context.get_metric_group().distribution("my_distribution")

    def eval(self, i):

        self.distribution.update(i)

        return i - 1


Scope

The definition of scope is the same as Java. You can find more description about scope here. The API of define user scope is as follows:

function_context

    .get_metric_group()

    .add_group("my_metrics")

    .counter("my_counter")

function_context

    .get_metric_group()

    .add_group("my_metrics_key", "my_metrics_value")

    .counter("my_counter")


User Variables

You can define a user variable by calling MetricGroup.add_group(key: str, value: str). This method affects what MetricGroup.get_metric_identifier, MetricGroup.get_scope_components and MetricGroup.get_all_variables() returns.

function_context

    .get_metric_group()

    .add_group("my_metrics_key", "my_metrics_value")

    .counter("my_counter")

MetricGroup

A MetricGroup is a named container for Metrics and further metric subgroups. Instances of this class can be used to register new metrics with Flink and to create a nested hierarchy based on the group names. A MetricGroup is uniquely identified by it's place in the hierarchy and name.


class MetricGroup(abc.ABC):

    def counter(self, name: str) -> 'Counter':

        """

        Registers a new `Counter` with Flink.

        """

        pass

    def gauge(self, name: str, method: Callable[[], int]) -> None:

        """

        Registers a new `Gauge` with Flink.

        """

        pass

    def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':

        """

        Registers a new `Meter` with Flink.

        """

        pass

    def distribution(self, name: str) -> 'Distribution':

        """

        Registers a new `Distribution` with Flink.

        """

        pass

    def add_group(self, name: str, extra: str = None) -> 'MetricGroup':

        """

        if extra is not None, creates a new key-value MetricGroup pair. The key group

        is added to this groups sub-groups, while the value group is added to the key

        group's sub-groups. This method returns the value group.

        The only difference between calling this method and

        `group.add_group(key).add_group(value)` is that get_all_variables()

        of the value group return an additional `"<key>"="value"` pair.

        """

        pass

    def get_scope_components(self) -> []:

        """

        Gets the scope as an array of the scope components, for example

        `["host-7", "taskmanager-2", "window_word_count", "my-mapper"]`

        """

        pass

    def get_all_variables(self) -> map:

        """

        Returns a map of all variables and their associated value, for example

        `{"<host>"="host-7", "<tm_id>"="taskmanager-2"}`

        """

        pass

    def get_metric_identifier(self, metric_name: str) -> str:

        """

        Returns the fully qualified metric name, for example

        `host-7.taskmanager-2.window_word_count.my-mapper.metricName`

        """

        pass

FunctionContext

A `get_metric_group` will be added to the FunctionContext class.

class FunctionContext(object):

    """

    Used to obtain global runtime information about the context in which the

    user-defined function is executed. The information includes the metric group,

    and global job parameters, etc.

    """


    def get_metric_group(self) -> MetricGroup:

        pass

Implementation Plan

  1. Transmit metric group information from Java to Python
  2. Support define scopes and variables on Python metric group
  3. Support Counter/Gauge/Distribution metric type for Python UDF
  4. Support Meter metric type for Python UDF
  5. Support Metrics for UDTF