Motivation

User-defined functions (UDFs) are widely used in Flink jobs to implement custom business logic. However, they pose significant challenges for debugging and performance optimization because they often act as black boxes. When issues arise (e.g., poor performance, high latency, frequent exceptions), it's hard for users and operators to trace the root causes inside UDFs. Currently, Flink does not provide dedicated built-in metrics to quantify critical aspects such as per-record processing time or exception count within UDFs. This lack of observability makes it harder to:

  • Debug production issues.

  • Tune performance and resource allocation.

  • Feed autoscaling systems (e.g., autosizer) with reliable UDF-level signals.

Providing standard, opt-in UDF-level metrics will help ensure the observability and health of the entire platform.

Public Interfaces

A new configuration option to enable or disable UDF metrics collection.

Config OptionDefaultDescription
table.exec.udf-metric-enabledfalse

Enables collection of per-record UDF metrics.

table.exec.udf-metric.sample-interval100

Measure every Nth UDF invocation when metrics are enabled. Higher values reduce overhead; lower values increase accuracy. Set to 1 for every invocation (not recommended for production).

Two new built-in metrics registered per operator for UDFs:

Metric NameTypeDescription
udfProcessingTime

Histogram

Sampled processing time for UDF invocations. Reports p50/p75/p95/p99/mean/min/max. Sampling interval is configurable.

udfExceptionCount

Counter

Counts UDF exceptions. When > 0, it clearly signals that errors originate in user code (UDF), supporting autosizer logic and auto-remediation.

Proposed Changes

Config Flag

Since per-invocation metrics can significantly impact performance (see FLINK-16444, where RocksDB per-operation stats caused >10% degradation), we adopt a two-layer protection:

  1. Feature gate: table.exec.udf-metric-enabled (default: false) — when disabled, no metrics are registered or recorded, ensuring zero overhead.
  2. Sampling: table.exec.udf-metric.sample-interval (default: 100) — when enabled, only every Nth invocation is measured. This follows the same counter-based sampling pattern established by Flink's state latency tracking (FLINK-21736). The fast path (non-sampled invocations) is a single integer increment with negligible overhead.

Metrics Hierarchy

To get UDF metrics at per TM UDF level, we define UdfMetricGroup with the following hierarchy and all metrics under this group should have the prefix “udf”.

TaskManagerMetricGroup
  └── TaskManagerJobMetricGroup
       └── TaskMetricGroup
            └── OperatorMetricGroup
              └── UdfMetricGroup
                 ├── udfProcessingTime (Histogram)
                 └── udfExceptionCount (Counter)

Metrics will be registered in the MetricRegistry and reported via existing MetricReporters (e.g., Prometheus, JMX).

UDF Metrics Naming Conventions

Based on Flink's metric naming conventions, UDF metrics should follow this pattern:

<operator_name>.<udf_name>.<metric_name>


// Example: For a UDF named "enrichUser" in a MapOperator
mapOperator.enrichUser.UDFprocessingTime
mapOperator.enrichUser.UDFexceptionCount

Metric Registration

Metric instantiation / registration happens during operator initialization:

  • The generated operator code will check udfMetricsEnabled:

operatorOpen() {
   ...
   // Check if UDF metrics are enabled
   boolean udfMetricsEnabled = config.getBoolean("table.exec.udf-metric-enabled", false);
   if (udfMetricsEnabled) {
      MetricGroup udfMetrics = operatorMetricGroup.addGroup("udf", udfName);
      instantiateProcessTime();
      instantiateExceptionCount();
   }
   ...
}
  • This ensures only instantiate and register UdfMetricGroup if the config is set to true.

Recording Values

With UDFs, we expect user to extend the UDF base classes and most of the time they will override the execution methods like open(), map(), or eval() in UDF base class. Therefore, the measurement of exception count or processing time cannot be added to those methods directly. To overcome this, we set the value for those metrics at CodeGen, so that these metrics are always available regardless of function overrides.

Sampling Strategy (following FLINK-21736 precedent):

We adopt the same counter-based sampling pattern used by Flink's state latency tracking:

  private int invocationCount = 0;

  private boolean shouldSample() {
      invocationCount = (invocationCount + 1 < sampleInterval) ? invocationCount + 1 : 0;
      return invocationCount == 1;
  }
  • Fast path (non-sampled): Single integer increment per invocation — negligible overhead
  • Sampled path: System.nanoTime() around the UDF call, result stored in a DescriptiveStatisticsHistogram with a bounded circular buffer (128 entries)
  • Exception counting: Not sampled — exceptions are rare events, every occurrence is counted

UDF Metrics:

  • On sampled invocations: measure start and end times around UDF invocations → udfProcessingTime.
  • On all invocations: catch and count exceptions → udfExceptionCount.

The generated operator code will automatically handle both synchronous and asynchronous UDF execution.

Implementation Entry Points:

CommonExecMatch.java (pass in Flink context classLoader to MatchCodeGenerator)
  -> MatchCodeGenerator.scala
        ->  ExprCodeGenerator.scala
              -> TableFunctionCallGen.scala
              -> ScalarFunctionCallGen.scala 

Context Access:

  • RuntimeContext access is needed to get UDF metrics for each TM
  • Context will be obtained via reflection since getMetricGroup() requires RichFunction
  • Metrics are scoped under OperatorMetricGroup to properly associate UDF metrics with their executing operator

For asynchronous UDFs (ASYNC_SCALAR and ASYNC_TABLE):

  • udfProcessingTime:
    • At CodeGen level: Only measures the time to call invokeAsync
    • Complete processing time should be measured in AsyncWaitOperator where ResultFuture completions are handled
  • udfExceptionCount:
    • At CodeGen level: Tracks exceptions in invokeAsync call
    • Completion exceptions should be tracked in AsyncWaitOperator via ResultFuture.completeExceptionally

Note: FLIP-498 (AsyncTableFunction) has been completed (FLINK-37724, PR #26567). The ASYNC_TABLE metrics implementation will follow the same pattern as ASYNC_SCALAR, instrumenting both the invokeAsync call at CodeGen level and completion handling in AsyncWaitOperator.

Compatibility, Deprecation, and Migration Plan

  • Backward Compatibility:

    • Fully backward compatible.

    • By default (enableUdfMetrics = false), Flink behavior is unchanged.

  • Deprecation:

    • No existing APIs or configs are deprecated.

  • Migration:

    • No migration steps required for existing jobs.

    • Users who want these metrics simply enable the config.

Test Plan

  • Unit Tests:

    • Verify metric registration logic with enableUdfMetrics = true and false.

    • Validate correct metric hierarchy and naming.

  • Performance Testing:

    • Benchmark overhead of per-message metric recording.

    • Ensure any performance regression is documented and acceptable.

    • Verify that with default sample-interval (100), overhead is <1% compared to metrics disabled
    • Validate that sample-interval=1 (every invocation) produces accurate results for correctness testing

Rejected Alternatives

Add Metrics Directly in UDF Base Classes:

  • Pros
    • Straightforward logic.
    • Fast and simple to implement
  • Cons
    • Maintenance overhead: for each of the new UDF base added we need to add metrics.
    • Limited options to user and we will consistently need to add additional classes to address user needs.