Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, the AdaptiveScheduler is primarily used in a so-called “reactive mode,” which assumes that there is only a single job running in the cluster (“application mode”) and greedily assigns all the resources that are available in the cluster to that job, which “adapts” to these resources, by automatically rescaling using the latest available checkpoint.
To quickly recall the current internal workings of the Adaptive Scheduler, let’s take a look at the illustration below (1):
As you can see, instead of asking for the exact number of slots, JobMaster declares its desired resources (for reactive mode range is set from one to infinity) to the ResourceManager, which then tries to fulfill those resources.
When JobMaster gets more resources during the runtime, as outlined in (2), it will automatically rescale the job using the latest available savepoint, eliminating the need for an external orchestration.
The resource requirements are currently either automatically derived from the JobGraph or, in the case of the reactive mode, set to “infinity.”
This FLIP proposes to add an ability to externally set per-job resource requirements using the REST API, which gives us a building block for external tooling such as horizontal auto-scaler.
The proposed endpoints would look as follows:
This could be further extended to support ResourceProfiles (fine-grained RM) without breaking backward compatibility. The PUT API is idempotent and always requires resource requirements for all vertices to be set (designed for machines, not humans).
Setting the lower or upper bound to -1 will reset its value to the default setting (1 for the lower bound and parallelism, or maxParallelism for the upper bound).
Expected behavior of the declarative API
The vital thing to note about the API outlined above is that it's declarative, meaning we're declaring the desired state to which we want our job to converge; If, after the update job no longer holds the desired resources (fewer resources than the lower bound), it will be canceled and transition back into the waiting for resources state.
In some use cases, you might always want to rescale to the upper bound (this goes along the lines of "preallocating resources" and minimizing the number of rescales, which is especially useful with the large state). This can be controlled by two knobs that already exist:
1) "jobmanager.adaptive-scheduler.min-parallelism-increase" - this affects a minimal parallelism increase step of a running job; we'll slightly change the semantics, and we'll trigger rescaling either once this condition is met or when you hit the ceiling; setting this to the high number will ensure that you always rescale to the upper bound
2) "jobmanager.adaptive-scheduler.resource-stabilization-timeout" - for new and already restarting jobs, we'll always respect this timeout, which allows you to wait for more resources even though you already have more resources than defined in the lower bound; again, in the case we reach the ceiling (the upper bound), we'll transition into the executing state.
Implementation of the RPC layer
We need to extend the following RPC interfaces: DispatcherGateway and JobMasterGateway.
The dispatcher needs to validate the requested resource overrides (making sure we don’t go beyond the operator’s maxParallelism, for example) and notify the Scheduler (via JobMasterGateway API) about the override. When the validation fails, returned future will be completed exceptionally with RestHandlerException (400 status code) and intuitive description.
Unable to set the upper bound for operator [%s] to [%d] because it’s beyond the operator’s max parallelism [%d].
Operator [%s] was not found in the JobGraph.
The request is incomplete, missing operator [%s] resource requirements.
(all other errors)
Corresponding methods for the JobMasterGateway:
The above method will propagate new requirements to the Scheduler.
These are synchronous equivalents of JobMasterGateway’s methods. We’ll only have a proper implementation for the AdaptiveScheduler (and the TestingScheduler); all other schedulers will throw a descriptive exception with why the method is not supported (something along the lines of “Defining resource requirements is currently only supported by the AdaptiveScheduler”).
Implementation of the AdaptiveScheduler layer
We’ll transform the ResourceConsumer interface into a ResourceListener, that watches for both: newly available resources and overrides of resources requirements.
The interface will be implemented by Executing and WaitingForResources states of the AdaptiveScheduler. It’s the same mechanics as of now, and we extend it by also checking for re-scaling opportunities when new resource requirements are pushed into the Scheduler.
We also need to add a new method to CreatingExecutionGraph.Context, for freeing up excessive slots in case the resource requirements are lower than the resources we hold after the downscale. The new method should only mark slots as free and leave the decision of returning the slots to the ResourceManager on the DeclarativeSlotPool. We need to ensure that idle slot timeouts work correctly in the AdaptiveScheduler (currently, they are ignored because we didn’t have to return slots in the reactive mode).
Implementation of the High Availability layer
To make this work correctly with high-availability services, we need to persist an updated JobGraph any time we override the resource requirements. Persisting the JobGraph will ensure we can recover with the exact resource requirements in case of JobMaster failover.
Since JobResourcesRequirements are tightly coupled with JobGraph, we’ll reuse the existing JobGraphStore component for persisting it. To avoid any extension to the JobGraph, we’ll introduce a new internal ConfigOption and reuse the current configuration field of the JobGraph. Any modifications to the JobGraphStore need to happen on the Dispatcher layer to avoid problems with multiple writers.
Demo of the prototype
Compatibility, Deprecation, and Migration Plan
Nothing to do.
The change will be covered with unit and integration tests.
This effort was initially started by Till Rohrmann and is heavily based on his prototype.