Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink exposes a generic MetricGroup to UDFs through RuntimeContext , such that users can implement their own custom metrics. However, without some kind of standardization, the custom metrics are unnecessarily hard to monitor. Metrics for sources and sinks are especially valuable and thus the effort to standardize the names in FLIP-33 was started.

This FLIP draws upon the ideas of FLIP-33 but extends it for all UDFs. In the process, some changes to FLIP-33 are necessary to better align with the generalization.

Another motivation for this FLIP lies in the way the metric system works: The metric system does not allow fetching pre-created metrics by name; you either have to lazily create it or supply a getter to the metric. Thus, we need to add a collection of interfaces that properly expose the standardized metrics. Note that this FLIP covers only those metrics (coming from FLIP-33) that cannot be automatically calculated as only these metrics need to be exposed.

Public Interfaces

The idea is to piggyback on the existing RuntimeContext#getMetricGroup, SourceReaderContext#getMetricGroup, and Sink$InitContext#getMetricGroup by returning specialized MetricGroup s that expose all standardized metrics (and we can extend them in the future).

Proposed Changes

First, we adjust RuntimeContext#getMetricGroup to return a specialized subinterface. Because it's a subclass, all existing user code compiles without changes (but needs to be recompiled because the signature changed). Note that the method is already PublicEvolving, so the user should expect minor changes.

@Public
public interface RuntimeContext {
	...
    /**
     * Returns the metric group for this parallel subtask.
     *
     * @return The metric group for this parallel subtask.
     */
    @PublicEvolving
    OperatorMetricGroup getMetricGroup(); // more specific return type
    ...
}

The OperatorMetricGroup currently contains only I/O related metrics in the OperatorIOMetricGroup. In the future, this interface allows us to provide more information.

/**
 * Special {@link org.apache.flink.metrics.MetricGroup} representing an Operator.
 *
 * <p>All metrics can only be accessed in the main operator thread.
 */
@NotThreadSafe
public interface OperatorMetricGroup extends MetricGroup {
    OperatorIOMetricGroup getIOMetricGroup();
}

In the new OperatorIOMetricGroup, all I/O related counters are collected. That is in line with FLIP-33 just as an interface so that we can expose that to the user.

/**
 * Metric group that contains shareable pre-defined IO-related metrics for operators.
 *
 * <p>All metrics can only be accessed in the main operator thread.
 */
@NotThreadSafe
public interface OperatorIOMetricGroup extends MetricGroup {
    /**
     * The total number of input records since the operator started. Will also populate
     * numRecordsInPerSecond meter.
     */
    Counter getNumRecordsInCounter();

    /**
     * The total number of output records since the operator started. Will also populate
     * numRecordsOutPerSecond meter.
     */
    Counter getNumRecordsOutCounter();

    /**
     * The total number of input bytes since the task started. Will also populate
     * numBytesInPerSecond meter.
     */
    Counter getNumBytesInCounter();

    /**
     * The total number of output bytes since the task started. Will also populate
     * numBytesOutPerSecond meter.
     */
    Counter getNumBytesOutCounter();
}

Further, we want to add the same abstraction for coordinators (albeit currently empty).

/**
 * Special {@link MetricGroup} representing an Operator coordinator.
 *
 * <p>All metrics can only be accessed in the main coordinator thread.
 */
@NotThreadSafe
public interface OperatorCoordinatorMetricGroup extends MetricGroup {}

All interfaces reside in flink-metrics-core in the org.apache.flink.metrics.groups package.

Source Readers

For sources, Flink provides a specialization of the OperatorMetricGroup . It exposes those FLIP-33 metrics that Flink cannot calculate automatically.

/**
 * Pre-defined metrics for sources.
 *
 * <p>All metrics can only be accessed in the main operator thread.
 */
@NotThreadSafe
public interface SourceReaderMetricGroup extends OperatorMetricGroup {
    /** The total number of record that failed to consume, process, or emit. */
    Counter getNumRecordsInErrorsCounter();

    /**
     * Sets an optional gauge for the number of bytes that have not been fetched by the source. e.g.
     * the remaining bytes in a file after the file descriptor reading position.
     *
     * <p>Note that not every source can report this metric in an plausible and efficient way.
     *
     * @see SettableGauge SettableGauge to continuously update the value.
     */
    <G extends Gauge<Long>> G setPendingBytesGauge(G pendingBytesGauge);

    /**
     * Sets an optional gauge for the number of records that have not been fetched by the source.
     * e.g. the available records after the consumer offset in a Kafka partition.
     *
     * <p>Note that not every source can report this metric in an plausible and efficient way.
     *
     * @see SettableGauge SettableGauge to continuously update the value.
     */
    <G extends Gauge<Long>> G setPendingRecordsGauge(G pendingRecordsGauge);
}

To access it, we again simply subclass the existing getter.

@PublicEvolving
public interface SourceReaderContext {

    /** @return The metric group of this source reader. */
    SourceReaderMetricGroup metricGroup(); // more specific return type
    ...
}

Split enumerator

Additionally, we add metrics for split enumerators.

/**
 * Pre-defined metrics for {@code SplitEnumerator}.
 *
 * <p>All metrics can only be accessed in the main operator thread.
 */
@NotThreadSafe
public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup {
    /**
     * Sets an optional gauge for the number of splits that have been enumerated but not yet
     * assigned. For example, this would be the number of files that are in the backlog.
     *
     * <p>Note that not every source can report this metric in an plausible and efficient way.
     *
     * @return the supplied gauge
     * @see SettableGauge SettableGauge to continuously update the value.
     */
    <G extends Gauge<Long>> G setUnassignedSplitsGauge(G unassignedSplitsGauge);
}

The metrics are accessible through the context object.

/**
 * A context class for the {@link SplitEnumerator}. This class serves the following purposes: 1.
 * Host information necessary for the SplitEnumerator to make split assignment decisions. 2. Accept
 * and track the split assignment from the enumerator. 3. Provide a managed threading model so the
 * split enumerators do not need to create their own internal threads.
 *
 * @param <SplitT> the type of the splits.
 */
@PublicEvolving
public interface SplitEnumeratorContext<SplitT extends SourceSplit> {

    /** @return The metric group of the split enumerator. */
    SplitEnumeratorMetricGroup metricGroup(); // more specific return type
    ...}

Sink

Similarly, we provide a SinkMetricGroup.

/**
 * Pre-defined metrics for sinks.
 *
 * <p>All metrics can only be accessed in the main operator thread.
 */
@NotThreadSafe
public interface SinkMetricGroup extends OperatorMetricGroup {
    /** The total number of records failed to send. */
    Counter getNumRecordsOutErrorsCounter();

    /**
     * Sets an optional gauge for the time it takes to send the last record.
     *
     * <p>This metric is an instantaneous value recorded for the last processed record.
     *
     * <p>If this metric is eagerly calculated, this metric should NOT be updated for each record.
     * Instead, update this metric for each batch of record or sample every X records.
     *
     * <p>Note for asynchronous sinks, the time must be accessible from the main operator thread.
     * For example, a `volatile` field could be set in the async thread and lazily read through the
     * gauge.
     *
     * @see SettableGauge SettableGauge to continuously update the value.
     */
    <G extends Gauge<Long>> G setCurrentSendTimeGauge(G currentSendTime);
}

And make it accessible in InitContext by changing the return type of metricGroup .

@Experimental
public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable {
    ...
    interface InitContext {
        ...

        /** @return The metric group this writer belongs to. */
        SinkMetricGroup metricGroup(); // more specific return type
    }
}


Compatibility, Deprecation, and Migration Plan

  • All user code will still compile but needs to be recompiled if any of the metric group functions of the contexts have been used. All of these methods are marked as at least PublicEvolving, so no user should be surprised.

Test Plan

Most tests will be unit tests to test the logic of the default implementations. We will also add an integration test with a small simple source and sink.

Rejected Alternatives

Constants

One easy option would be to just provide a collection of constants that can be used to emit the standardized metrics. However, that wouldn't standardize the type as well and may make it much harder for the implementers to use the metrics. In the worst case, semantics would significantly differ.

Adding metrics directly to the contexts

We could also directly embed all metrics as top-level objects to the contexts as proposed to FLIP-33. However, logically grouping them helps both Flink developers and connector implementers by adding more clarity and ease testing.