Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP proposes an extension to the Adaptive Scheduler (FLIP-160) and Declarative Resource Management (FLIP-138), allowing external tools to declare job resource requirements using the REST API.

Currently, the AdaptiveScheduler is primarily used in a so-called “reactive mode,” which assumes that there is only a single job running in the cluster (“application mode”) and greedily assigns all the resources that are available in the cluster to that job, which “adapts” to these resources, by automatically rescaling using the latest available checkpoint.

Proposed Changes

Overview

To quickly recall the current internal workings of the Adaptive Scheduler, let’s take a look at the illustration below (1):

(1) job submission

As you can see, instead of asking for the exact number of slots, JobMaster declares its desired resources (for reactive mode range is set from one to infinity) to the ResourceManager, which then tries to fulfill those resources.

When JobMaster gets more resources during the runtime, as outlined in (2), it will automatically rescale the job using the latest available savepoint, eliminating the need for an external orchestration.

(2) rescaling

The resource requirements are currently either automatically derived from the JobGraph or, in the case of the reactive mode, set to “infinity.”

This FLIP proposes to add an ability to externally set per-job resource requirements using the REST API, which gives us a building block for external tooling such as horizontal auto-scaler.

The proposed endpoints would look as follows:

GET /jobs/<job-id>/resource-requirements

RESPONSE BODY:
{
    "<first-vertex-id>": {
        "parallelism": {
            "lowerBound": 1,
            "upperBound": 4
        }
    },
    "<second-vertex-id>": {
        "parallelism": {
            "lowerBound": 1,
            "upperBound": 3
        }
    }
}
PUT /jobs/<job-id>/resource-requirements

REQUEST BODY:
{
    "<first-vertex-id>": {
        "parallelism": {
            "lowerBound": 3,
            "upperBound": 5
        }
    },
    "<second-vertex-id>": {
        "parallelism": {
            "lowerBound": 2,
            "upperBound": 3
        }
    }
}

This could be further extended to support ResourceProfiles (fine-grained RM) without breaking backward compatibility. The PUT API is idempotent and always requires resource requirements for all vertices to be set (designed for machines, not humans).

Setting the lower or upper bound to -1 will reset its value to the default setting (1 for the lower bound and parallelism, or maxParallelism for the upper bound).

Expected behavior of the declarative API

The vital thing to note about the API outlined above is that it's declarative, meaning we're declaring the desired state to which we want our job to converge; If, after the update job no longer holds the desired resources (fewer resources than the lower bound), it will be canceled and transition back into the waiting for resources state.

In some use cases, you might always want to rescale to the upper bound (this goes along the lines of "preallocating resources" and minimizing the number of rescales, which is especially useful with the large state). This can be controlled by two knobs that already exist:

1) "jobmanager.adaptive-scheduler.min-parallelism-increase" - this affects a minimal parallelism increase step of a running job; we'll slightly change the semantics, and we'll trigger rescaling either once this condition is met or when you hit the ceiling; setting this to the high number will ensure that you always rescale to the upper bound

2) "jobmanager.adaptive-scheduler.resource-stabilization-timeout" - for new and already restarting jobs, we'll always respect this timeout, which allows you to wait for more resources even though you already have more resources than defined in the lower bound; again, in the case we reach the ceiling (the upper bound), we'll transition into the executing state.

Implementation of the RPC layer

We need to extend the following RPC interfaces: DispatcherGateway and JobMasterGateway.

DispatcherGateway:

/**
 * For the GET /<job-id>/resources endpoint.
 * 
 * Dumps resources requirements for a particular job.
 */
CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(JobID jobId);

/**
 * For the PATCH /<job-id>/resources endpoint.
 * 
 * Overrides resources requirements for a particular job.
 *
 * This method also supports partial updates (eg. updating the parallelism for single vertex).
 */
CompletableFuture<Acknowledge> updateJobResourceRequirements(
    JobID jobId, JobResourceRequirements requirements);

The dispatcher needs to validate the requested resource overrides (making sure we don’t go beyond the operator’s maxParallelism, for example) and notify the Scheduler (via JobMasterGateway API) about the override. When the validation fails, returned future will be completed exceptionally with RestHandlerException (400 status code) and intuitive description.

Possible errors:

CodeMessage
400

Unable to set the upper bound for operator [%s] to [%d] because it’s beyond the operator’s max parallelism [%d].

400

Operator [%s] was not found in the JobGraph.

400

The request is incomplete, missing operator [%s] resource requirements.

500

(all other errors)

Corresponding methods for the JobMasterGateway:

CompletableFuture<JobResourceRequirements> requestJobResourceRequirements();

CompletableFuture<Acknowledge> updateJobResourceRequirements(JobResourceRequirements requirements);

The above method will propagate new requirements to the Scheduler.

SchedulerNG:

These are synchronous equivalents of JobMasterGateway’s methods. We’ll only have a proper implementation for the AdaptiveScheduler (and the TestingScheduler); all other schedulers will throw a descriptive exception with why the method is not supported (something along the lines of “Defining resource requirements is currently only supported by the AdaptiveScheduler”).

JobResourceRequirements requestJobResourceRequirements(JobID jobId);

void updateJobResourceRequirements(JobResourceRequirements requirements);

Implementation of the AdaptiveScheduler layer

We’ll transform the ResourceConsumer interface into a ResourceListener, that watches for both: newly available resources and overrides of resources requirements.

/**
 * Interface which denotes that {@link State} can react to newly available resource (slots) and
 * changes in resouce requirements.
 */
interface ResourceListener {

    /** Notifies that new resources are available. */
    void onNewResourcesAvailable();

    /** Notifies that the desired resources have changed. */
    void onNewResourceRequirements();
}


The interface will be implemented by Executing and WaitingForResources states of the AdaptiveScheduler. It’s the same mechanics as of now, and we extend it by also checking for re-scaling opportunities when new resource requirements are pushed into the Scheduler.

We also need to add a new method to CreatingExecutionGraph.Context, for freeing up excessive slots in case the resource requirements are lower than the resources we hold after the downscale. The new method should only mark slots as free and leave the decision of returning the slots to the ResourceManager on the DeclarativeSlotPool. We need to ensure that idle slot timeouts work correctly in the AdaptiveScheduler (currently, they are ignored because we didn’t have to return slots in the reactive mode).

/**
 * Mark excessive reserved slots as free. This will mark them as elegible for being returned
 * back to the ResourceManager (might be posponed due to `idle slot timeout`).
 */
void freeExcessiveReservedSlots();

Implementation of the High Availability layer

To make this work correctly with high-availability services, we need to persist an updated JobGraph any time we override the resource requirements. Persisting the JobGraph will ensure we can recover with the exact resource requirements in case of JobMaster failover.

Since JobResourcesRequirements are tightly coupled with JobGraph, we’ll reuse the existing JobGraphStore component for persisting it. To avoid any extension to the JobGraph, we’ll introduce a new internal ConfigOption and reuse the current configuration field of the JobGraph. Any modifications to the JobGraphStore need to happen on the Dispatcher layer to avoid problems with multiple writers.

Demo of the prototype

https://drive.google.com/file/d/1Vp8W-7Zk_iKXPTAiBT-eLPmCMd_I57Ty/view?usp=share_link

Compatibility, Deprecation, and Migration Plan

Nothing to do.

Test Plan

The change will be covered with unit and integration tests.

Rejected Alternatives

None.

Credit

This effort was initially started by Till Rohrmann and is heavily based on his prototype.