Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/bp9wymtrfmsjxk3hql3r4gp6yd0qyhxr
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

FLINK-38283 - Getting issue details... STATUS

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation 

Flink Auto Scaler has been widely used to handle Flink application traffic spik, fast failure recovery and also to reduce over provisioned resources in production for both K8s and Yarn environments. From the operation experience, we find several small gaps to make the existing autoscaling model even more powerful.

  • extendability of adding customized or external metrics for autoscaling algorithm
  • plugin solution to support customized autoscaling algorithm
  • Changing configuration will require a Flink job restart. It will be lightweight to use dynamic job configuration for this purpose

This FLIP proposal supports users to customize their autoscaling algorithm based on their business needs by allow developer to define their own MetricsCollector, MetricsEvaluator, and ScalingRealizer.

Public Interfaces

To help the autoscaler leverage external metrics info for decision making, we need to make the MetricsCollector, MetricsEvaluator, and ScalingRealizer  extendable by using a plugin model.


public interface MetricCollector extends Plugin {

    CollectedMetricHistory updateMetrics(Context ctx,  AutoScalerStateStore<KEY, Context> stateStore) throws Exception;
         
    Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) throws Exception;

    Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) throws Exception;

    // new interface for query external metrics of signal
    Map<FlinkMetric, Metric> queryExternalMetrics(Context ctx) throws Exception;
}

public interface MetricEvaluator extends Plugin {

    EvaluatedMetrics evaluate(Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime) {
}

// Allows customized algorithm to change configuration other than parallelism and memory
public interface ScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>> extends Plugin {

    void realizeParallelismOverrides(Context context, Map<String, String> parallelismOverrides)
            throws Exception;

   void realizeConfigOverrides(Context context, ConfigChanges configChanges) throws Exception;
}


Proposed Changes

Extendable and pluggable modules

Job.autoscaler.metric.evaluator.class = org.apache.flink.autoscaler.CustomizedMetricsEvaluator.class

Job.autoscaler.metrics.collector.class = org.apache.flink.autoscaler.CustomizedMetricsCollector.class

Job.autoscaler.scaling.realizer.class = org.apache.flink.autoscaler.realizer.CustomizedScalingRealizer.class

Dynamic load autoscaling component

The PluginManager will be used to load configured MetricsEvaluator, MetricsCollector and ScalingRealizer implementation by using flink configuration of each job.

Apply configuration changes through dynamic configuration API 

The FLIP-530 proposed changing Flink job configuration after it was submitted to Flink. It is useful for some use cases to reduce the job interrupt time as configuration is updated AdaptiveScheduler and job change will align with checkpoint completion time. If a customized autoscaling algorithm wants to change both checkpoint interval and TM parallelism, the solution will be better handled with minimal down time.   

If it is ready to use, we will implement a default Scaling Realizer with dynamic configuration API.

Compatibility, Deprecation, and Migration Plan

After this FLIP, The default autoscaling component implementation will be loaded. No migration is needed for existing users 


Rejected Alternatives

Make the autoscaler interface itself pluggable. It is a heavy design. It does not promote a nice modular and pluggable design but instead just makes it easier to create effective forks of the codebase without having to rebuild the operator

  • No labels