DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Motivation
User-defined functions (UDFs) are widely used in Flink jobs to implement custom business logic. However, they pose significant challenges for debugging and performance optimization because they often act as black boxes. When issues arise (e.g., poor performance, high latency, frequent exceptions), it's hard for users and operators to trace the root causes inside UDFs. Currently, Flink does not provide dedicated built-in metrics to quantify critical aspects such as per-record processing time or exception count within UDFs. This lack of observability makes it harder to:
Debug production issues.
Tune performance and resource allocation.
Feed autoscaling systems (e.g., autosizer) with reliable UDF-level signals.
Providing standard, opt-in UDF-level metrics will help ensure the observability and health of the entire platform.
Public Interfaces
A new configuration option to enable or disable UDF metrics collection.
| Config Option | Default | Description |
|---|---|---|
| table.exec.udf-metric-enabled | false | Enables collection of per-record UDF metrics. |
| table.exec.udf-metric.sample-interval | 100 | Measure every Nth UDF invocation when metrics are enabled. Higher values reduce overhead; lower values increase accuracy. Set to 1 for every invocation (not recommended for production). |
Two new built-in metrics registered per operator for UDFs:
| Metric Name | Type | Description |
|---|---|---|
udfProcessingTime | Histogram | Sampled processing time for UDF invocations. Reports p50/p75/p95/p99/mean/min/max. Sampling interval is configurable. |
udfExceptionCount | Counter | Counts UDF exceptions. When > 0, it clearly signals that errors originate in user code (UDF), supporting autosizer logic and auto-remediation. |
Proposed Changes
Config Flag
Since per-invocation metrics can significantly impact performance (see FLINK-16444, where RocksDB per-operation stats caused >10% degradation), we adopt a two-layer protection:
- Feature gate: table.exec.udf-metric-enabled (default: false) — when disabled, no metrics are registered or recorded, ensuring zero overhead.
- Sampling: table.exec.udf-metric.sample-interval (default: 100) — when enabled, only every Nth invocation is measured. This follows the same counter-based sampling pattern established by Flink's state latency tracking (FLINK-21736). The fast path (non-sampled invocations) is a single integer increment with negligible overhead.
Metrics Hierarchy
To get UDF metrics at per TM UDF level, we define UdfMetricGroup with the following hierarchy and all metrics under this group should have the prefix “udf”.
TaskManagerMetricGroup
└── TaskManagerJobMetricGroup
└── TaskMetricGroup
└── OperatorMetricGroup
└── UdfMetricGroup
├── udfProcessingTime (Histogram)
└── udfExceptionCount (Counter)
Metrics will be registered in the MetricRegistry and reported via existing MetricReporters (e.g., Prometheus, JMX).
UDF Metrics Naming Conventions
Based on Flink's metric naming conventions, UDF metrics should follow this pattern:
<operator_name>.<udf_name>.<metric_name>
|
Metric Registration
Metric instantiation / registration happens during operator initialization:
The generated operator code will check udfMetricsEnabled:
operatorOpen() {
...
// Check if UDF metrics are enabled
boolean udfMetricsEnabled = config.getBoolean("table.exec.udf-metric-enabled", false);
if (udfMetricsEnabled) {
MetricGroup udfMetrics = operatorMetricGroup.addGroup("udf", udfName);
instantiateProcessTime();
instantiateExceptionCount();
}
...
}
- This ensures only instantiate and register UdfMetricGroup if the config is set to true.
Recording Values
With UDFs, we expect user to extend the UDF base classes and most of the time they will override the execution methods like open(), map(), or eval() in UDF base class. Therefore, the measurement of exception count or processing time cannot be added to those methods directly. To overcome this, we set the value for those metrics at CodeGen, so that these metrics are always available regardless of function overrides.
Sampling Strategy (following FLINK-21736 precedent):
We adopt the same counter-based sampling pattern used by Flink's state latency tracking:
private int invocationCount = 0;
private boolean shouldSample() {
invocationCount = (invocationCount + 1 < sampleInterval) ? invocationCount + 1 : 0;
return invocationCount == 1;
}
- Fast path (non-sampled): Single integer increment per invocation — negligible overhead
- Sampled path: System.nanoTime() around the UDF call, result stored in a DescriptiveStatisticsHistogram with a bounded circular buffer (128 entries)
- Exception counting: Not sampled — exceptions are rare events, every occurrence is counted
UDF Metrics:
- On sampled invocations: measure start and end times around UDF invocations → udfProcessingTime.
On all invocations: catch and count exceptions → udfExceptionCount.
The generated operator code will automatically handle both synchronous and asynchronous UDF execution.
Implementation Entry Points:
CommonExecMatch.java (pass in Flink context classLoader to MatchCodeGenerator)
-> MatchCodeGenerator.scala
-> ExprCodeGenerator.scala
-> TableFunctionCallGen.scala
-> ScalarFunctionCallGen.scala
Context Access:
- RuntimeContext access is needed to get UDF metrics for each TM
- Context will be obtained via reflection since getMetricGroup() requires RichFunction
- Metrics are scoped under OperatorMetricGroup to properly associate UDF metrics with their executing operator
For asynchronous UDFs (ASYNC_SCALAR and ASYNC_TABLE):
- udfProcessingTime:
- At CodeGen level: Only measures the time to call invokeAsync
- Complete processing time should be measured in AsyncWaitOperator where ResultFuture completions are handled
- udfExceptionCount:
- At CodeGen level: Tracks exceptions in invokeAsync call
- Completion exceptions should be tracked in AsyncWaitOperator via ResultFuture.completeExceptionally
Note: FLIP-498 (AsyncTableFunction) has been completed (FLINK-37724, PR #26567). The ASYNC_TABLE metrics implementation will follow the same pattern as ASYNC_SCALAR, instrumenting both the invokeAsync call at CodeGen level and completion handling in AsyncWaitOperator.
Compatibility, Deprecation, and Migration Plan
Backward Compatibility:
Fully backward compatible.
By default (
enableUdfMetrics = false), Flink behavior is unchanged.
Deprecation:
No existing APIs or configs are deprecated.
Migration:
No migration steps required for existing jobs.
Users who want these metrics simply enable the config.
Test Plan
Unit Tests:
Verify metric registration logic with
enableUdfMetrics = trueandfalse.Validate correct metric hierarchy and naming.
Performance Testing:
Benchmark overhead of per-message metric recording.
Ensure any performance regression is documented and acceptable.
- Verify that with default sample-interval (100), overhead is <1% compared to metrics disabled
- Validate that sample-interval=1 (every invocation) produces accurate results for correctness testing
Rejected Alternatives
Add Metrics Directly in UDF Base Classes:
- Pros
- Straightforward logic.
- Fast and simple to implement
- Cons
- Maintenance overhead: for each of the new UDF base added we need to add metrics.
- Limited options to user and we will consistently need to add additional classes to address user needs.