Status
...
Page properties | |
---|---|
|
...
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Note: The Adaptive Scheduler was initially called Declarative Scheduler, but has been renamed.
In order to support the reactive mode (FLIP-159) we need a different type of scheduler which first announces the required resources and only after having received the resources decides on the actual parallelism with which to execute the job. This has the benefit that this scheduler can schedule jobs if not all required resources are fulfilled. Moreover, it allows to continue executing jobs even after TaskManagers
have been lost. The declarative adaptive scheduler builds upon the declarative resource management (FLIP-138).
Proposed Changes
The declarative adaptive scheduler will first work for streaming jobs only. This will simplify things considerably because we always have to schedule all operators. Moreover, by treating every failure as a global failover which restarts the whole topology, we can further simplify the scheduler. This failover behaviour is the default for many streaming topologies anyways if they don't consist of disjunct graphs. Given these assumptions we want to develop the following scheduler:
...
If the system cannot recover from a local failover because it does not have enough slots available, it must be escalated which makes it a global failover. A global failover will allow the system to rescale the whole job.
State machine of the scheduler
Given the description above we propose the following state machine to model the behaviour of the declarative adaptive scheduler:
PlantUML |
---|
@startuml hide empty description [*] -> Created Created --> Waiting : Start scheduling state "Waiting for resources" as Waiting Waiting --> Waiting : Resources are not stable yet Waiting --> Executing : Resources are stable Waiting --> Finished : Cancel, suspend or suspend not enough \nresources for executing Executing --> Canceling : Cancel Executing --> Failing : Unrecoverable fault Executing --> Finished : Suspend or job reached terminal state Executing --> Restarting : Recoverable fault Restarting --> Finished : Suspend Restarting --> Canceling : Cancel Restarting --> Waiting : Cancelation complete Canceling --> Finished : Cancelation complete Failing --> Finished : Failing complete Finished -> [*] @enduml |
The states have the following semantics:
...
Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in Restarting) which only work if there hasn’t happened another no other state change has happened, we need to introduce a state version which can be used to filter out outdated operations.
Stable set of resources
The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. There are different ways to achieve this and one One possible solution approach is to set sets an upper limit for the waiting time. This approach will also be implemented is also the approach we want to implement in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state. If the job cannot be executed with the available resources, then the scheduler will fail it.
In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there and how to decide on a stable set of consumers/resources.
Automatic scaling
In order to support automatic scaling, we ask a ScalingPolicy
ScaleUpController
whenever new slots arrive and the scheduler is in state Executing whether the job can be scaled up. If this is the case, then it the scheduler transitions into the Restarting
state which triggers a global failover and a restart which will make use of the available resources. It is important to note that scale down actions will be triggered by failures of tasks whose slots have been removed.
Components of the scheduler
The scheduler consists of the following services to accomplish its job. These services are used by the different states to decide on state transitions and to perform certain operations
PlantUML |
---|
@startuml package "DeclarativeAdaptive Scheduler" { [SlotAllocator] [FailureHandler] [ScalingPolicyScaleUpController] } @enduml |
SlotAllocator
The SlotAllocator is the component responsible for determining the resource requirements and mapping a JobGraph and its contained vertices to slots.
This consists of 2 parts:1)
- Calculating the resources required for scheduling a JobGraph / set of vertices
...
- Calculating a mapping of vertices to be scheduled to free slots, and optionally rescaling vertices.
The interface will look like this:
...
calculateDesiredSlots
returns aResourceCounter
that describes the ideal amount of resources for the job.determineParallelism
accepts aJobInformation
and a collection of free slots, and attempts to find the optimal parallelism for every operator given the available set of slots. If no such mapping could be found an empty Optional is returned to signal the Scheduler that this topology cannot be scheduled at this time. This method may be called by the scheduler irrespective of whether it has received the desired slots. If not enough slots have been provided the allocator may decide to downscale the job accordingly. On the flip-side, if more slots are available then necessary it may also upscale the job. The decision whether to scale up a job or not will be handled by ScalingPolicythe desired slots.assignResources
assigns the available resources to the ExecutionVertices and reserves the slots according to the provided assignments parameter.the provided assignments parameter.
The first implementation of the SlotAllocator interface will support slot sharing w/o respecting previous allocations and input preferences. Moreover, it will distribute the available slots equally across the different slot sharing groups. The SlotAllocator implementation will respect the configured parallelism and never decide on a parallelism which exceeds the configured maxParallelism of an operatorThe first implementation of the SlotAllocator interface will support slot sharing w/o respecting previous allocations and input preferences.
FailureHandler
In order to handle failures, the declarative adaptive scheduler will support the same RestartBackoffTimeStrategy as used by the pipelined region scheduler. Hence all currently RestartBackoffTimeStrategies will be supported. The failure handling procedure is the following:
- Check whether the failure is recoverable. If not, then go to Failing state
- Ask the configured RestartBackoffTimeStrategy whether we can restart. If not, then go to Failing state
- Ask the configured RestartBackoffTimeStrategy for the backoff time between failure and restart
...
- Go into the Restarting state with the returned backoff time
ScaleUpController
Whenever the scheduler is in the Executing state and receives new slots, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScalingPolicy ScaleUpController given the old and new cumulative parallelism of all operators whether it should scale up or not.
Code Block |
---|
/** * Simple policycontroller for controlling the scale up behavior of the {@link * org.apache.flink.runtime.scheduler.declarative.DeclarativeScheduler AdaptiveScheduler}. */ public interface ScalingPolicyScaleUpController { /** * This method gets called whenever new resources are available to the scheduler to scale up. * * @param currentCumulativeParallelism Cumulative parallelism of the currently running job * graph. * @param newCumulativeParallelism Potential new cumulative parallelism with the additional * resources. * @return true if the policy decided to scale up based on the provided information. */ boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism); } |
A basic default implementation will only scale up if newCumulativeParallelism - currentCumulativeParallelism >= increaseThreshold
.
How to distinguish streaming jobs
Since we can not execute batch jobs with the DeclarativeScheduleradaptive scheduler, we need to be able to detect whether a job is a batch or a streaming job. For this purpose, we are introducing a new enum field in the JobGraph, called JobType. The default JobType of a JobGraph will be BATCH.
For batch jobs (from the DataSet API), setting this field is trivial (in the JobGraphGenerator
).
...
Lastly, the Blink Table API / SQL Planner also generates StreamGraph instances, which contain batch jobs. We are tagging the StreamGraph as a batch job in the ExecutorUtils.setBatchProperties()
method.which contain batch jobs. We are tagging the StreamGraph as a batch job in the ExecutorUtils.setBatchProperties()
method.
If we detect that the adaptive scheduler has been configured for a batch job, we will fall back to another scheduler supporting batch jobs (currently the pipelined region scheduler).
Configuration
We intend to extend/introduce the following new configuration values/parameters:
- Extend
jobmanager.scheduler
to accept new valueadaptive
in order to activate the declarative scheduler - Introduce
adaptive-scheduler.resource-timeout
to configure the resource timeout for the "Waiting for resources" state
Compatibility, Deprecation, and Migration Plan
The declarative adaptive scheduler will be a beta feature which the user has to activate explicitly by setting the config option jobmanager.scheduler: declarative
. It adaptive
. This entails that Flink's default behaviour won't change.
If the adaptive scheduler is activated, then it will only be chosen if the user submitted a streaming jobjob. If the user submitted a batch job, then Flink will fall back to the pipelined region scheduler.
Limitations & future improvements
The first version of the declarative adaptive scheduler will come with a handful of limitations in order to reduce the scope of it.
Streaming jobs only
The declarative adaptive scheduler runs with streaming jobs only. When submitting a batch job, then the default scheduler will be used.
No support for local recovery
In the first version of the scheduler we don't intend to support local recovery. Adding support for it should be possible and we intend to add support for it as a follow up.
No support for local failovers
Supporting local failovers is another feature which we want to add as a follow up. One Adding support for it allows to not having to restart the whole job. One idea could be to extend the existing state machine by a new state "Restarting locally":
PlantUML |
---|
@startuml hide empty description [*] -> Created Created --> Waiting : Start scheduling state "Waiting for resources" as Waiting state "Restarting globally" as RestartingG state "Restarting locally" as RestartingL Waiting --> Waiting : Resources are not stable yet Waiting --> Executing : Resources are stable Waiting --> Finished : Cancel, suspend or suspend\nnot enough resources for executing Executing --> Canceling : Cancel Executing --> Failing : Unrecoverable fault Executing --> Finished : Suspend or job reached terminal state Executing --> RestartingG : Recoverable global fault Executing --> RestartingL : Recoverable local fault RestartingL --> Executing : Recovered locally RestartingL --> RestartingL : Recoverable local fault RestartingL --> RestartingG : Local recovery timeout RestartingL --> Canceling : Cancel RestartingL --> Finished : Suspend RestartingL --> Failing : Unrecoverable fault RestartingG --> Finished : Suspend RestartingG --> Canceling : Cancel RestartingG --> Waiting : Cancelation complete Canceling --> Finished : Cancelation complete Failing --> Finished : Failing complete Finished -> [*] @enduml |
No support for local failovers
Supporting local failovers is another feature which we want to add as a follow up. Adding support for it allows to not having to restart the whole job.
No integration with Flink's web UI
The declarative scheduler allows that a job's parallelism can change over its lifetime. This means that we have to extend the web UI to be able to display different forms of a job. One idea would be to have a timeline which allows to pick a time for which the web UI displays the current job. This will require changes on the backend as well as frontend side.
No support for fine grained resource specifications
For the sake of simplicity and narrowing down the scope, the declarative scheduler will ignore any resource specifications. In the future when having different resource profiles to fulfil, it will be the task of the ResourceManager to make sure that different resource requirements are fulfilled equally well.
Non-zero downtime rescaling
No integration with Flink's web UI
The adaptive scheduler allows that a job's parallelism can change over its lifetime. This means that we have to extend the web UI to be able to display different forms of a job. One idea would be to have a timeline which allows to pick a time for which the web UI displays the current job. This will require changes on the backend as well as frontend side.
No support for fine grained resource specifications
For the sake of simplicity and narrowing down the scope, the adaptive scheduler will ignore any resource specifications. In the future when having different resource profiles to fulfil, it will be the task of the ResourceManager to make sure that different resource requirements are fulfilled equally well.
Non-zero downtime rescaling
Rescaling happens through restarting the job, thus jobs with large state might need a lot of resources and time to rescale. Rescaling a job causes downtime of your job, but no data loss.
Per-job configuration
It might be useful to select the used scheduler on a per-job basis. Within the scope of this FLIP, the scheduler will only configurable for the whole cluster. Hence, introducing a job configuration for selecting which scheduler to use could be a good follow up.
Slow performance when recovering from a fault
Since creating an ExecutionGraph
is a costly operation (see FLINK-21110) which can also involve IO operation if certain sources/sinks are used, the failover might be not very fast. If this becomes a problem, then we have to think about pulling one time initialisation tasks out of the ExecutionGraph
and to speed up the creation of the ExecutionGraph
in order to speed up the failoverRescaling happens through restarting the job, thus jobs with large state might need a lot of resources and time to rescale. Rescaling a job causes downtime of your job, but no data loss.
Test Plan
The new scheduler needs extensive unit, IT and end-to-end testing because it is a crucial component which is at the heart of Flink.
Rejected Alternatives
We also tried to find a design for a adaptive scheduler which supports batch and streaming jobs at the same time. This design has turned out to be a bit too complex and therefore we rejected it. The details for this design can be found here.