Status

Motivation

One of the most challenging aspects of running always-on streaming pipeline is the correct sizing of Flink deployments. Too few resources lead to unstable Flink jobs which requires users to scale up their deployments manually. On the other hand, a surplus of resources can become very costly to users over time. Even once optimized, the resource demands can change depending on the time of the day or due to seasonal patterns. Clearly, it would be desirable to automatically adjust the resources for Flink deployments. This process is referred to as autoscaling.

The key problem in autoscaling is to decide when and how much to scale up and down. With Flink, each scaling decision has an associated cost because Flink cannot do in-place autoscaling. Flink requires the application to save its state, stop, and restart from the saved state with a new configuration. Hence, the scaling algorithm must account for the scaling costs. This is especially important in the case of backlog, i.e. queued data awaiting to be processed. Every scaling decision must make sure that the backlog can be fully processed as configured by the user, or by the end of the retention time at the latest.

It has been argued that Flink jobs can be autoscaled by just changing the job parallelism, similarly to how the Kubernetes Autoscaler works. However, this is not a sensible general solution because most large-scale jobs are very heterogeneous when it comes to the load distribution of operators. Often, they are tuned in a way that the parallelism of individual operators were adjusted to ensure optimal data flow. Such jobs may contain multiple (disjoint) branches which need completely different parallelisms. In the following, we describe an autoscaling solution which is “Flink-native”, i.e. a solution which understands how Flink internally processes data. A solution which aims at providing the most optimal, yet cost-efficient data flow.

Proposed Changes

We propose to add autoscaling functionality to the Flink Kubernetes operator. We believe this is the most natural place to implement autoscaling because the operator is highly available, has access to all relevant deployment metrics, and is able to reconfigure the deployment for the rescaling. In addition to the existing deployment and management capabilities, the Kubernetes operator will also monitor the load of Flink pipelines and take measures to make effective use of the assigned resources, as well as achieving a backpressure-free data flow.

Goals

  1. Support autoscaling Flink jobs deployed on Kubernetes via the Flink Kubernetes operator
  2. Provide a default Flink job metric collector via the Flink metrics API
  3. Provide a robust default scaling algorithm
    1. Ensure scaling yields effective usage of assigned task slots, i.e. prevents backpressure but does not overprovision
    2. Ramp up in case of any backlog to ensure it gets processed in a timely manner
    3. Minimize the number of scaling decisions to prevent costly rescale operation
  4. Allow plugging in a custom scaling algorithm and metric collection code

Goals (for now)

  1. Vertical scaling via adjusting container resources or changing the number of task slots per TM
  2. Autoscaling in standalone mode
  3. Integrate with the k8s scheduler to reserve required resources for rescaling

Scaling Algorithm

The proposed scaling algorithm is based on [1] which has been proven to yield a resource efficient backpressure-free configuration in very few amount of scaling decisions. The algorithm adjusts parallelisms of the job vertices in the JobGraph according to their resource needs which are expressed in terms of the minimum parallelism to run backpressure free. The key to achieving this state efficiently is to accurately predict the effects of parallelism changes on downstream vertices before executing the scaling decision. This is done by measuring the capacity of the vertices moving topologically from sources to sinks which allows us to predict how the capacity of downstream tasks will change based on upstream parallelism changes.

0. Fetch the job topology of the running Flink deployment

We don't strictly need the JobGraph. It is sufficient to have all Job vertices, their vertex ids, their parallelisms, and the edges. This information can be retrieved from the JsonPlan which is exposed via the JobDetailsInfo Rest API.

1. Metric Collection

The first step is collecting metrics for all JobVertices by combining metrics from all the runtime subtasks and computing the average. For now, we plan to query the metrics from the JobManager. The following metrics are accumulated:

  • busyTimeMsPerSecond
    The average number of milliseconds in a second spent processing data, excluding backpressure
  • numRecordsInPerSecond
    The average number of inflowing records per second
  • numRecordsOutPerSecond
    The average number of outflowing records per second

  • Backlog size (if available)
    The current size of the backlog of the source, e.g. FLIP-33: pending_records
  • Backlog growth rate (if available)
    The current growth rate of the backlog of the source, e.g. rate of growth of FLIP-33: pending_records
  • Number of splits (if available)
    The total number of splits that the source can process in parallel

2. Set source parallelisms (backlog-based scaling)

For sources, adjust the parallelism of the sources such that they read from the backlog fast enough to clear the backlog. Put simply, this is done with the following formula:

The new processing rate is calculated by looking at the current processing rate and the backlog size, as well as the estimated downtime during rescale. Further, users can configure a maximum time to fully process the backlog.
Current backlog size information should come from the connector metrics (when available) and the estimated time to rescale could either be provided by a user (in this we need to create a config option for this) or the operator could keep track of past scaling actions and measure their time to determine this automatically. However in the first iteration it should be enough to scale based on the current backlog at the time of scaling, and the spare capacity allocated should allow for catching up the lag growth during the scling itself.

 

For most sources like Kafka there is an upper bound on the parallelism based on the number of partitions. If backlog information is not available, users can set desired parallelism or target rate. We also aim at choosing a parallelism which yields an equal distribution of the splits among the subtasks.

Sidenote about scaling sources without backlog information: Backlog growth information is necessary to scale sources succesfully under backpressure. However there is nothing stopping us from scaling the sources on a busyTime/CPU time basis once backpressure has been eliminated by scaling up downstream operators. This was not part of our initial prototype implementation but it could be extended easily, as it does not affect the algorithm at large.
Most regular streaming sources should in any case provide backlog / input rate information even if they are not currently exposed under pendingRecords. It should be our goal to improve connector metrics, not only to provide the best autoscaling capability but good operational experience in general.

3. Set downstream parallelisms (utilization-based scaling)

We topologically traverse the JobGraph, computing the parallelism of each vertex such that the aggregated true output rate of the inputs of each vertex matches its true processing rate. The true output / processing rate is a means to normalize the true capacity of a vertex based on how many records can be outputted / processed per time unit. This allows us to predict the new rates when lowering or increasing the parallelism of a vertex.

  1. True Processing Rate = numRecordsInPerSecond / (busyTimeMsPerSecond / 1000)
  2. True Output Rate = numRecordsOutPerSecond / (busyTimeMsPerSecond / 1000)

 

After we have updated the parallelism, we recompute the true output rate for the current vertex before recursively continuing to topologically traverse the JobGraph.

Example

In this diagram, we show an example dataflow which reads from two different sources. Log 1 has a backlog growth rate of 100 records per time unit. Similarly, Log 2 has a backlog growth of 500. This means that without any processing, the backlog grows by the 100 or 500 records, respectively. Source 1 is able to read 10 records per time unit, Source 2 reads 50 records per time unit. The downstream operators process the records from the source and produce new records for their downstream operators. For simplicity, the number of inflowing and outflowing records for an operator is always the same here. The actual algorithm doesn't make that assumption.

Now let's evaluate one round by the algorithm:

When the scaling is strictly linear, this will lead the following optimal result:


According to the paper, in the worst case it takes up to three attempts to reach this state. However, it can already converge after one scaling attempt. Note that the algorithm handles up- and downscaling. In the example, one of the vertices is scaled by parallelism 1 which won't change the parallelism, but it is also possible for vertices to be scaled down if the scaling factor is lower than 1.

The target utilization as defined by the busy time metric is 100% in this example but users will be able to configure it. Arguably, a target utilization of less than 100% will provide more stable scaling decisions.

Configuration


KeyDefaultDescription

job.autoscaler.enabled

false

Enable Job Autoscaler

job.autoscaler.scaling.enabled

false

Enable Job Vertex Scaling actions (if disabled only metrics will ble collected)

job.autoscaler.stabilization.interval

5min

Stabilization period in which no new scaling will be executed

job.autoscaler.scaling.sources.enabled

true

Whether to enable scaling source vertices.

job.autoscaler.target.utilization

0.7

Target vertex utilization

job.autoscaler.target.utilization.boundary

0.1

Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary

job.autoscaler.scale-up.grace-period

10min

Period in which no scale down is allowed after a scale up

job.autoscaler.scale-down.max-factor

0.6

Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.

job.autoscaler.catch-up.duration

5min

The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.

job.autoscaler.vertex.max-parallelism


Max vertex parallelism after scale up.

job.autoscaler.vertex.min-parallelism


Min vertex parallelism after scale down.
job.autoscaler.metrics.history.duration10min

job.autoscaler.metrics.history.max-size


Maximum number of metric reports collected for each job.
job.autoscaler.metrics.history.min-size
Minimum number of metric reports to collect before scaling logic is executed

Scaling Execution

Once the scaling algorithm has computed the updates, the JobGraph needs to be updated. The most straight-forward way would be to supply parallelism overrides as part of the Flink configuration as proposed in FLINK-29501 - Getting issue details... STATUS . The operator would just modify the deployment spec and the job would redeploy with the updated parallelisms. Eventually, we could add support for Flink's /rescale API which would allow us to in-place rescale the cluster without tearing it down. However, this has the downside of losing the immutability of the deployment.

Scaling Metrics

The autoscaler component itself will publish metrics to provide insight into how the decision was made. In particular those metrics will be:

  • the utilization of each job vertex based on the current autoscaler configuration
  • the true processing and output rates of each job vertex based on the targeted utilization
  • the ideal parallelism for each job vertex

Rejected Alternatives:

Job-wide parallelism tuning

Configuring the parallelism job-wide won't work with more complex jobs which either already have fine-tuned operator-level parallelisms, or have drastically different requirements for its branches, as it is often the case. Even if we managed to find a way to preserve existing parallelism configurations, we can't expect a global scaling factor to yield a satisfying configuration where neither the vertices are under- nor overprovisioned.

Scale based on CPU usage

We have deliberately rejected scaling only based on the CPU usage because determining the scale up/down factor is much less accurate as with the proposed solution. In particular this is true for backpressured tasks which often only show a load increase when downstream operators have been scale up. This in turn will trigger further scaling up of the upstream operator which may again get backpressured by the downstream task. The resulting scaling configuration takes many more attempts than if we except the actual processing capabilities of the job vertices as proposed in this FLIP.

Extra autoscaling component

We could create a dedicated autoscaling component, such that the operator doesn't have to deal with metric collection. We will continue to evaluate whether that will be necessary but for the first version we want to keep it simple and chose to directly integrate autoscaling with the main deployment reconciliation loop.

References

[1] Kalavri, Liagouris, Hoffmann, and Dimitrova, Forshaw, Roscoe: Three steps is all you need: fast, accurate, automatic scaling decisions for distributed streaming dataflows

  • No labels