DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink Auto Scaler has been widely used to handle Flink application traffic spik, fast failure recovery and also to reduce over provisioned resources in production for both K8s and Yarn environments. From the operation experience, we find several small gaps to make the existing autoscaling model even more powerful.
- extendability of adding customized or external metrics for autoscaling algorithm
- plugin solution to support customized autoscaling algorithm
- Changing configuration will require a Flink job restart. It will be lightweight to use dynamic job configuration for this purpose
This FLIP proposal supports users to customize their autoscaling algorithm based on their business needs by allow developer to define their own MetricsCollector, MetricsEvaluator, and ScalingRealizer.
Public Interfaces
To help the autoscaler leverage external metrics info for decision making, we need to make the MetricsCollector, MetricsEvaluator, and ScalingRealizer extendable by using a plugin model.
public interface MetricCollector extends Plugin {
CollectedMetricHistory updateMetrics(Context ctx, AutoScalerStateStore<KEY, Context> stateStore) throws Exception;
Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) throws Exception;
Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) throws Exception;
// new interface for query external metrics of signal
Map<FlinkMetric, Metric> queryExternalMetrics(Context ctx) throws Exception;
}
public interface MetricEvaluator extends Plugin {
EvaluatedMetrics evaluate(Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime) {
}
// Allows customized algorithm to change configuration other than parallelism and memory
public interface ScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>> extends Plugin {
void realizeParallelismOverrides(Context context, Map<String, String> parallelismOverrides)
throws Exception;
void realizeConfigOverrides(Context context, ConfigChanges configChanges) throws Exception;
}
Proposed Changes
Extendable and pluggable modules
Job.autoscaler.metric.evaluator.class = org.apache.flink.autoscaler.CustomizedMetricsEvaluator.class
Job.autoscaler.metrics.collector.class = org.apache.flink.autoscaler.CustomizedMetricsCollector.class
Job.autoscaler.scaling.realizer.class = org.apache.flink.autoscaler.realizer.CustomizedScalingRealizer.class
Dynamic load autoscaling component
The PluginManager will be used to load configured MetricsEvaluator, MetricsCollector and ScalingRealizer implementation by using flink configuration of each job.
Apply configuration changes through dynamic configuration API
The FLIP-530 proposed changing Flink job configuration after it was submitted to Flink. It is useful for some use cases to reduce the job interrupt time as configuration is updated AdaptiveScheduler and job change will align with checkpoint completion time. If a customized autoscaling algorithm wants to change both checkpoint interval and TM parallelism, the solution will be better handled with minimal down time.
If it is ready to use, we will implement a default Scaling Realizer with dynamic configuration API.
Compatibility, Deprecation, and Migration Plan
After this FLIP, The default autoscaling component implementation will be loaded. No migration is needed for existing users
Rejected Alternatives
Make the autoscaler interface itself pluggable. It is a heavy design. It does not promote a nice modular and pluggable design but instead just makes it easier to create effective forks of the codebase without having to rebuild the operator