...
Since we have a couple of asynchronous operations (resource timeout in "Waiting for resources" state, restart delay in Restarting) which only work if there hasn’t happened another no other state change has happened, we need to introduce a state version which can be used to filter out outdated operations.
...
The "Waiting for resources" state has the purpose to wait for the required resources. Since the cluster might not be able to provide all of the declared resources, the system needs to handle this situation as well. Hence, this state waits until either all required resources have arrived or until the set of available resources has stabilised. A set of resources has stabilised if the system expects that it won't change anymore. There are different ways to achieve this and one One possible solution approach is to set sets an upper limit for the waiting time. This approach will also be implemented is also the approach we want to implement in the first version of the scheduler. Consequently, whenever the scheduler enters the "Waiting for resources" state, it registers a timeout after which it will try to go into the Executing state. If the job cannot be executed with the available resources, then the scheduler will fail it.
In the future we might take a look at Kafka's consumer protocol and how consumer changes are handled there and how to decide on a stable set of consumers/resources.
Automatic scaling
In order to support automatic scaling, we ask a ScalingPolicy
ScaleUpController
whenever new slots arrive and the scheduler is in state Executing whether the job can be scaled up. If this is the case, then it the scheduler transitions into the Restarting
state which triggers a global failover and a restart which will make use of the available resources. It is important to note that scale down actions will be triggered by failures of tasks whose slots have been removed.
Components of the scheduler
...
- Check whether the failure is recoverable. If not, then go to Failing state
- Ask the configured RestartBackoffTimeStrategy whether we can restart. If not, then go to Failing state
- Ask the configured RestartBackoffTimeStrategy for the backoff time between failure and restart
- Go into the Restarting state with the returned backoff time
ScaleUpController
Whenever the scheduler is in the Executing state and receives new slots, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScaleUpController given the old and new cumulative parallelism of all operators whether it should scale up or not.
...
Since we can not execute batch jobs with the declarative scheduler, we need to be able to detect whether a job is a batch or a streaming job. For this purpose, we are introducing a new enum field in the JobGraph, called JobType. The default JobType of a JobGraph will be BATCH.
For batch jobs (from the DataSet API), setting this field is trivial (in the JobGraphGenerator
).
...
In the first version of the scheduler we don't intend to support local recovery. Adding support for it should be possible and we intend to add support for it possible and we intend to add support for it as a follow up.
No support for local failovers
Supporting local failovers is another feature which we want to add as a follow up. One Adding support for it allows to not having to restart the whole job. One idea could be to extend the existing state machine by a new state "Restarting locally":
...
PlantUML |
---|
@startuml hide empty description [*] -> Created Created --> Waiting : Start scheduling state "Waiting for resources" as Waiting state "Restarting globally" as RestartingG state "Restarting locally" as RestartingL Waiting --> Waiting : Resources are not stable yet Waiting --> Executing : Resources are stable Waiting --> Finished : Cancel, suspend or \nnot enough resources for executing Executing --> Canceling : Cancel Executing --> Failing : Unrecoverable fault Executing --> Finished : Suspend or job reached terminal state Executing --> RestartingG : Recoverable global fault Executing --> RestartingL : Recoverable local fault RestartingL --> Executing : Recovered locally RestartingL --> RestartingL : Recoverable local fault RestartingL --> RestartingG : Local recovery timeout RestartingL --> Canceling : Cancel RestartingL --> Finished : Suspend RestartingL --> Failing : Unrecoverable fault RestartingG --> Finished : Suspend RestartingG --> Canceling : Cancel RestartingG --> Waiting : Cancelation complete Canceling --> Finished : Cancelation complete Failing --> Finished : Failing complete Finished -> [*] @enduml |
No support for local failovers
...
No integration with Flink's web UI
...