DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: https://lists.apache.org/thread/zffx5r6fjkjldjjxr8oqtkwsobtwc6c3
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Flink Autoscaler currently provides a powerful, opinionated scaling pipeline that collects metrics, evaluates them, and applies parallelism changes automatically. However, the scaling decision logic is entirely internal as there is no extension point for users to intercept, modify, or reject a computed scaling decision before it is applied.
Real-world production environments often require organization-specific scaling policies that go beyond what the built-in autoscaler logic can express. Examples include:
- Resource-aware scaling gates: Only allow scaling when actual resource utilization, such as average TaskManager JVM CPU usage, memory pressure, or network I/O exceeds a threshold, preventing unnecessary job restarts when resource headroom is still available.
- Source-aligned parallelism with lag-driven scaling: Enforce that all downstream operators match the source operator's parallelism, ensuring uniform data distribution in pipelines with strict ordering requirements, and only allow scaling decisions that are driven by source lag or catch-up needs. This ensures that scaling is triggered by genuine backpressure from consumer lag rather than transient load spikes, and that the resulting parallelism is consistent across the entire pipeline.
- Cost-aware scaling caps: Integrate with cloud billing APIs or resource quota systems to veto scaling decisions that would exceed a budget, a reserved capacity limit, or an organizational resource quota.
- Multi-job coordination: Consult an external coordination service to ensure that scaling one job doesn't starve shared cluster resources needed by other jobs running on the same Flink cluster or Kubernetes namespace.
FLIP-514 (Custom Evaluator plugin for Flink Autoscaler) addresses extensibility at the metric evaluation layer, allowing users to plug in custom logic for how the existing, already-collected metrics are interpreted and used to compute scaling targets. However, it operates exclusively on the fixed set of ScalingMetric values (such as LOAD, TRUE_PROCESSING_RATE, TARGET_DATA_RATE, GC_PRESSURE, HEAP_MAX_USAGE_RATIO, etc.) that are hardcoded in ScalingMetricCollector and ScalingMetricEvaluator. So, the FLIP-514 it does not provide a mechanism to collect additional Flink metrics (e.g., Status.JVM.CPU.Load) or metrics from external monitoring systems, nor does it offer a way to intercept or modify the final scaling decision after it has been computed.
The current FLIP addresses a complementary need at the scaling decision execution layer, allowing users to plug in custom logic for whether and how a computed scaling decision should be applied, with the ability to query any additional metrics, from the Flink REST API or external systems, at decision time.
Together with FLIP-514 and future work on custom metric collection (discussed in the "Future Work" section), these extension points will enable a fully customizable autoscaling pipeline:
```
Collect (standard + custom metrics)
→ Evaluate (standard + custom evaluators per FLIP-514)
→ Decide (standard scaling summary computation)
→ Apply (ScalingExecutorPlugin - this FLIP)
→ Apply (parallelism overrides)
Public Interfaces
New Interface: ScalingExecutorPlugin
A new SPI interface in the flink-autoscaler module:
```java
public interface ScalingExecutorPlugin<KEY, CTX extends JobAutoScalerContext<KEY>> {
default int priority() {
return 0;
}
Map<JobVertexID, ScalingSummary> apply(
Context<KEY, CTX> context, Map<JobVertexID, ScalingSummary> scalingSummaries);
class Context<KEY, CTX extends JobAutoScalerContext<KEY>> {
private final CTX autoScalerContext;
private final Configuration configuration;
private final EvaluatedMetrics evaluatedMetrics;
private final JobTopology jobTopology;
public Context(
CTX autoScalerContext,
Configuration configuration,
EvaluatedMetrics evaluatedMetrics,
JobTopology jobTopology) {
this.autoScalerContext = autoScalerContext;
this.configuration = configuration;
this.evaluatedMetrics = evaluatedMetrics;
this.jobTopology = jobTopology;
}
public CTX getAutoScalerContext() {
return autoScalerContext;
}
public Configuration getConfiguration() {
return configuration;
}
public EvaluatedMetrics getEvaluatedMetrics() {
return evaluatedMetrics;
}
public JobTopology getJobTopology() {
return jobTopology;
}
}
}
```
Modified Class: ScalingExecutor
The ScalingExecutor class gains:
- A Collection<ScalingExecutorPlugin<KEY, Context>> field to hold discovered executor plugins.
- A private ScalingExecutorPlugin method that chains executor plugins and returns Optional<Map<JobVertexID, ScalingSummary>>.
Discovery Mechanisms
- Kubernetes Operator: Executor plugins are discovered via PluginUtils.createPluginManagerFromRootFolder(conf).load(ScalingExecutorPlugin.class) in AutoscalerFactory.
- Standalone Autoscaler: Executor plugins are discovered via ServiceLoader.load(ScalingExecutorPlugin.class) in StandaloneAutoscalerEntrypoint.
No New Configuration Options
This FLIP does not introduce new configuration keys. Executor plugin implementations may define their own configuration options read from the Configuration parameter passed to apply method.
Proposed Changes
Draft PR: https://github.com/apache/flink-kubernetes-operator/pull/1085
1. New SPI: ScalingExecutorPlugin
The new implementation introduces a new generic interface, ScalingExecutorPlugin<KEY, Context>, in the flink-autoscaler module. This is the single public API surface of this FLIP.
With the new implementation the flow will be the following: An executor plugin receives the full scaling context (the autoscaler context, the current configuration, the evaluated metrics, the job topology, and the map of computed scaling summaries) and returns one of three outcomes:
- Approve: return the summaries unmodified.
- Modify: return a new or altered map of summaries (e.g., cap parallelism, remove specific vertices).
- Veto: return an empty Optional to reject the scaling operation entirely.
The interface provides a priority() default method (returning 0) that controls the executor plugin's position in the execution chain. Lower values execute first. This allows independent executor plugin implementations to declare a stable ordering without requiring coordination between them.
2. Executor Plugin Chain Integration
The executor plugin chain is invoked inside ScalingExecutor, after scaling summaries have been computed and after the scaling-blocked check has passed, but before parallelism overrides are applied. This placement gives executor plugins the final say on whether and how a scaling action proceeds.
The high-level flow becomes:
```
+----------------------------+
| Compute scaling summaries |
+----------------------------+
|
v
+-----------------------------------------------+
| Check if scaling is blocked |
| (quotas, excluded periods, other gates, ...) |
+-----------------------------------------------+
|
v
+-----------------------------------------------+
| Apply ScalingExecutorPlugin chain |
| (priority-ordered; lowest runs first) |
+-----------------------------------------------+
|
v
+--------------------------------+
| Executor Plugin 1 (prio lowest)|
+--------------------------------+
|
v
+--------------------------------+
| Executor Plugin 2 |
+--------------------------------+
|
v
...
|
v
+---------------------------------+
| Executor Plugin N (prio highest)|
+---------------------------------+
|
v
+-----------------------------------------------+
| Outcome |
| - Vetoed -> skip scaling |
| - Modified -> use modified decision |
| - Approved -> continue scaling |
+-----------------------------------------------+
|
v
+------------------------------+
| Apply parallelism overrides |
+------------------------------+
```
3. Priority Ordering
Because executor plugin implementations are discovered dynamically (see section 5.), their natural iteration order is non-deterministic. To ensure a stable and predictable execution sequence, ScalingExecutor sorts the discovered executor plugins by their priority() value at construction time and stores them as an immutable, ordered list. The ordered chain is logged at startup for operational visibility.
The priority() method uses a default implementation returning 0, so existing or simple executor plugins work without overriding it. Executor plugins that need explicit ordering override the method with a negative value (run earlier) or positive value (run later).
4. Chain Semantics
Executor plugins execute sequentially in priority order (ascending by priority() value). Each executor plugin receives the output of the previous executor plugin, forming a pipeline. The chain short-circuits immediately if any executor plugin vetoes (returns empty) or produces an empty summaries map. This design means:
- Coarse-grained gates (e.g., resource checks, coordination locks) should declare a low priority so they run first and short-circuit early.
- Fine-grained modifiers (e.g., parallelism alignment, cost caps) should declare a higher priority so they operate on already-approved decisions.
- Executor plugins with equal priority have no guaranteed relative ordering and they should be designed to be independent of each other.
5. Plugin Discovery
Executor plugins implementations are discovered automatically at autoscaler startup via the standard Flink plugin mechanisms, requiring no configuration changes:
- Kubernetes Operator deployment: Executor plugins are loaded from the Flink plugins directory using PluginUtils (the same mechanism used for other Flink plugins). Users package their executor plugin JAR into the operator image under the plugins folder.
- Standalone Autoscaler deployment: Executor plugins are loaded via Java's ServiceLoader mechanism. Users place their executor plugin JAR on the classpath with a corresponding META-INF/services/org.apache.flink.autoscaler.ScalingExecutorPlugin file.
In both cases, multiple executor plugin implementations can be discovered and will be composed into the chain described above.
Compatibility, Deprecation, and Migration Plan
- Backward compatible: All changes are additive. Existing code that constructs ScalingExecutor without executor plugins continues to work unchanged. The 2-arg and 3-arg constructors delegate to the full 4-arg constructor with an empty executor plugin collection.
- No deprecations: No existing APIs are deprecated.
- Migration: Users who want to use executor plugins simply deploy an executor plugin JAR. No code changes are required for users who do not use executor plugins.
Test Plan
- Unit tests for executor plugin chain semantics (ScalingExecutorTest):
- Plugin’s apply approves scaling (returns summaries unchanged).
- Plugin’s apply vetoes scaling (returns `Optional.empty()`).
- Plugin’s apply modifies summaries (adjusted parallelism propagated).
- Plugin’s apply returns an empty map (treated as veto).
- Multiple executor plugins chained (each receives previous executor plugin’s output).
- Multiple executor plugins all approve.
- No executor plugins registered (pass-through, backward compatible).
- The Executor plugin receives correct context, configuration, metrics, and topology.
- Unit tests for example executor plugin (TestScalingExecutorPluginTest):
- CPU above threshold + source scales up → scaling approved and aligned.
- CPU below threshold → scaling vetoed.
- Source didn't scale up → scaling vetoed.
- REST client failure → scaling vetoed (safety measure).
- Boundary conditions (CPU exactly at threshold, single vertex, etc.).
Rejected Alternatives
Kubernetes Environment Classes Extensibility via Plugin Discovery
Reference: PR #1081, FLINK-39390
This approach proposes making the Kubernetes-specific autoscaler environment classes (ScalingRealizer, AutoScalerEventHandler, AutoScalerStateStore, and ScalingMetricCollector) pluggable and discoverable via Flink's plugin mechanism.
While this provides broad extensibility for the Kubernetes operator's autoscaler wiring, it operates at the infrastructure/environment layer rather than the scaling decision layer. Making the entire ScalingRealizer or ScalingMetricCollector pluggable allows replacing how metrics are collected or how scaling actions are applied, but it does not provide a lightweight, composable hook for intercepting the scaling decision itself. Users who only need to gate or modify a scaling decision would be forced to replace or wrap an entire infrastructure component, which is disproportionately complex for the use case. Furthermore, this approach is specific to the Kubernetes operator and does not extend to the standalone autoscaler.
2. Autoscaler Extensibility via Custom Scaling Algorithm Replacement
Reference: Mailing list discussion, PR #1020
This approach proposed replacing the autoscaler's scaling realizer entirely, similar with the previous rejected alternative.
3. Custom Autoscale Algorithm (FLIP-543)
Reference: Mailing list discussion, FLIP-543
FLIP-543 proposes introducing a pluggable custom autoscaling algorithm that would allow users to replace the autoscaler's default scaling logic with their own implementation. While this provides maximum flexibility for organizations with fundamentally different scaling strategies, it requires users to take full ownership of the scaling computation, effectively reimplementing the autoscaler's core logic including metric windowing, utilization-based target parallelism computation, scaling effectiveness detection, backlog processing, delayed scale-down, and key group / partition alignment. This "replace the algorithm" model is appropriate when the default scaling logic is fundamentally unsuitable, but it is disproportionately heavy for the more common use case where users want to constrain or adjust the output of the existing algorithm rather than replace it entirely.
Current Limitations
- Executor Plugins must self-serve additional metrics: The EvaluatedMetrics passed to executor plugins contains only the fixed set of ScalingMetric enum values (LOAD, TRUE_PROCESSING_RATE, GC_PRESSURE, HEAP_MAX_USAGE_RATIO, etc.). Executor plugins that need metrics outside this set (such as Status.JVM.CPU.Load, custom application metrics, or external monitoring system metrics) must query them independently via context.getRestClusterClient() or their own external clients. This has several drawbacks:
- No temporal consistency: Standard metrics are collected at time T₁, but executor plugin-queried metrics are fetched at time T₂ (potentially seconds later).
- No windowed averaging: Standard metrics benefit from the autoscaler's metric window (current + average over window). Executor plugin-queried metrics only get a single instant snapshot.
- Duplicate fetches: If multiple executor plugins need the same custom metric, each queries it independently.
- Testing overhead: Executor plugins that perform I/O require REST client mocking in tests, rather than simply receiving pre-collected data.
- No executor plugin-specific state management: Executor plugins are stateless from the autoscaler's perspective. If a executor plugin needs to track state across scaling cycles (e.g., "how many times was scaling vetoed in a row"), it must manage its own state externally.
- No feedback mechanism: When a executor plugin vetoes scaling, the only feedback is a log message. There is no structured event or metric emitted that operators can alert on. Integration with AutoScalerEventHandler for executor plugin veto events is left for future work.
Future Work
1. Custom Metric Provider SPI
A separate CustomMetricProvider SPI integrated into ScalingMetricCollector would allow executor plugin and evaluator implementations to declare additional metrics they need, rather than querying them ad-hoc:
```java
@Experimental
public interface CustomMetricProvider<KEY, Context extends JobAutoScalerContext<KEY>> {
/** TM-level Flink metrics to collect (e.g., "Status.JVM.CPU.Load"). */
default List<String> getTaskManagerMetricNames() { return List.of(); }
/** Per-vertex Flink metrics to collect. */
default Map<JobVertexID, List<String>> getVertexMetricNames(
JobTopology topology) { return Map.of(); }
/** Metrics from external monitoring systems (Prometheus, Datadog, etc.). */
default Map<String, Double> collectExternalMetrics(Context context) {
return Map.of();
}
}
```
This would be complementary to ScalingExecutorPlugin: the CustomMetricProvider handles what to collect, while the ScalingExecutorPlugin handles what to decide. The collected custom metrics would flow into EvaluatedMetrics (via a new Map<String, Double> customMetrics field or similar) so that both FLIP-514 custom evaluators and ScalingExecutorPlugin implementations can consume them without performing their own I/O.
Together with FLIP-514 (Custom Evaluator plugin for Flink Autoscaler), the full extensible pipeline would be:
```
ScalingMetricCollector
├── Standard metric collection (existing)
├── CustomMetricProvider: vertex metrics (future)
├── CustomMetricProvider: TM/JM metrics (future)
└── CustomMetricProvider: external system metrics (future)
↓
ScalingMetricEvaluator
├── Standard evaluation (existing)
└── Custom Evaluator plugins (FLIP-514)
↓
ScalingExecutor
├── Standard scaling summary computation (existing)
└── ScalingExecutorPlugin chain (this FLIP)
↓
Apply parallelism overrides
```
2. Executor Plugin Veto Events
Integrate with AutoScalerEventHandler to emit structured events when a executor plugin vetoes scaling, enabling alerting and observability.