Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order to better work under constrained resources (e.g. Flink cannot obtain all the requested resources), the JobMaster cannot expect that all of its slot requests are getting fulfilled. Currently, JobMasters ask for every slot individually and fail the job if it cannot be obtained. Instead of deciding on the required set of slots before asking the ResourceManager, it would be more flexible if the JobMaster first declared what it needs to run the job. Based on what the JobMaster actually gets assigned by the ResourceManager, it can then decide to run the job with an adjusted parallelism. That way the JobMaster is able to react if it should get fewer slots than it declared.

Proposed Solution

The solution proposal consists of two parts: Making the slot allocation protocol declarative and changing the scheduling to lazily construct the ExecutionGraph.


Declarative slot protocol

Instead of asking for every slot individually by calling ResourceManagerGateway.requestSlot(JobMasterId, SlotRequest), the JobMaster should declare the collection of required slots via a new RPC ResourceManagerGateway.declareRequiredResources(JobMasterId jobMasterId, ResourceRequirements resourceRequirements).

ResourceRequirements
public class ResourceRequirements implements Serializable {
    private final JobID jobId;

    private final String targetAddress;

    private final Collection<ResourceRequirement> resourceRequirements;
}
ResourceRequirement
public class ResourceRequirement implements Serializable {
    private final ResourceProfile resourceProfile;
    private final int numberOfRequiredSlots;
}


The jobMasterId is used to filter out messages coming from an old Job leader.

Announcing resources

Requirements are declared by the JobMaster at the very start of the job and are also continuously updated throughout the job execution. Requirements may increase (e.g., if we want to scheduler another stage in addition to what we have scheduled so far) or decrease (e.g., a stage has finished).

The absolute resource requirements are sent to the ResourceManager, which will update the internal bookkeeping by taking the last announced value as the ground truth.

If a heartbeat between the ResourceManager and JobMaster times out, then the requirements for the respective job will be set to 0, to prevent us from allocating slots to jobs that no longer exist.


Fulfilling resources

The ResourceManager will try to fulfill the last received ResourceRequirements for every job.

The component responsible for fulfilling the resource requirements on the ResourceManager side is the SlotManager. The SlotManager manages the available slots in the cluster and matches them with jobs depending on their declared requirements.

SlotMananger

The SlotManager will try to fulfill outstanding requirements whenever a requirement is increased, a new slot is added (either due to a TaskManager coming online or a slot being freed) or an existing allocated slot is removed (usually due to a TaskManager shutdown).

How to distribute slots across different jobs

A simple strategy to fulfill unfulfilled requirements is to try to satisfy the requirements in a first come first serve fashion. Jobs which register their requirements first, will have precedence over other jobs also if the requirements change during the runtime. This approach is straight-forward and prevents issues where resources are distributed in such a way that no job has enough.

For the time being (and in order to keep the protocol simpler), the ResourceManager won’t revoke resources/slots which are assigned to a different job. This means that the ResourceManager will only assign free resources in order to fulfill the resource requirements. In a future version, we might think about letting the ResourceManager balance resources across jobs. This would entail that the ResourceManager may ask the JobMaster to release slots.

How to match slots to requirements

Since slots have a specific resource profile which may or may not fulfill a given requirement we need a strategy for finding a matching slot. The first version will use the first encountered slot that matches the requirement, but more sophisticated algorithms can be used in the future to improve resource utilization. The Gale-Shapley algorithm can provide an optimal match between the sets of available and required slots, but is relatively expensive due to it's polynomial runtime.

If no free slots are available the SlotManager will try to allocate a new TaskExecutor.

Once a matching slot was found the TaskExecutor is told to offer the slots to the JobMaster. Should this request fail, the SlotManager will try to find another matching slot. If the request went through but is considered failed due to a timeout, then it is the responsibility of the JobMaster to eventually release the exceeding slot.

Failing unfulfillable slot requests early

Currently, the SlotManager supports failing unfulfillable slot requests by calling ResourceActions.notifyAllocationFailure. A slot is unfulfillable if the SlotManager has neither allocated slots nor can allocate a slot from the ResourceManager. This works because we have individual slot requests which are identified by the AllocationID. With the declarative resource management, we cannot fail individual slot requests. However, we can let the JobMaster know if we cannot fulfill the resource requirement for a job after resourcemanager.standalone.start-up-time has passed. In order to send this notification we have to introduce a new RPC JobMaster.notifyNotEnoughResources(Collection<ResourceRequirement> acquiredResources). acquiredResources is the collection of acquired resources for the job.

This signal is sent whenever the SlotManager tried to fulfill the requirements for a job but failed to do so.

Timeouts

The SlotManager will no longer timeout slot requests. Instead it will try to acquire enough resources to fulfill the required resources until the resource requirements have been changed. Failing the job execution if the job cannot acquire enough resources after a defined period of time is then the responsibility of the JobMaster and the SlotPool.

Interface changes

In order to enable the SlotManager to process resource declarations, we need to extend the interface with an additional method:

SlotManager interface extension
interface SlotManager {
	/**
	 * Process the given resource requirements. The resource requirements define the
     * required resources for the specified job. The SlotManager will try to fulfill
     * these requirements.
     *
     * @param resourceRequirements resourceRequirements defines the resource requirements for a job
	 */
	void processResourceRequirements(ResourceRequirements resourceRequirements);
}


In order to enable the SlotManager to notify the JobMaster about not enough resources, we need to extend the JobMasterGateway with an additional method:

JobMasterGateway interface extension
interface JobMasterGateway {
  /**
   * Notifies that not enough resources are available to fulfill the resource requirements of a job.
   *
   * @param acquiredResources the resources that have been acquired for the job
   */
   void notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> acquiredResources);
}


Accepting resources

On the JobMaster side, the SlotPool is responsible for accepting offered slots, and matching these against the requirements of the job. It has to follow the same logic for matching slots as the SlotManager.

Since slot offers can arrive in an arbitrary order, the SlotPool may have to redo the matching of all slots to the requirements, which may also involve the cancelling of ongoing executions.

This is a significant different to the current protocol, where each request for a slot is identified by an AllocationID, which is also passed with the slot offer allowing easy matching.

Whenever new slots are available the SlotPool notifies the Scheduler, which then may start the scheduling process if enough slots have been received.

If the SlotPool is provided with more slots than are currently required, then it will return these slots after the idle slot timeout has passed. This serves as a sort of grace period, potentially allowing us to make use of excessive slots later on without having to do another round-trip to the ResourceManager.


Note: Depending on the scheduling requirements it might make sense to reuse slots which have been freed on the JobMaster because it reduces latency or to return them and to ask for properly sized slots because it improves resource utilization (assuming different resource requirements). At the moment, we assume that reusing slots is possible. In the future we might have to make this behaviour configurable.

Releasing resources

Resources/Slots are released by the JobMaster by calling TaskExecutorGateway.freeSlot() and by updating the required resources by calling ResourceManagerGateway.declareRequiredResources with the updated resource requirements.

There is no guarantee for the order in which these messages arrive at the ResourceManager, but for production use-cases we can generally assume the declaration to arrive first due to the ResourceManager being co-located to the JobMaster. This may be a bigger concern during testing however.

There are two cases to distinguish:

  1. If the reduction of declared resources arrives first, then the ResourceManager will update the ResourceRequirements for the JobMaster, but not take any further action. We assume that any reduction will eventually coincide with the JobMaster freeing a slot on the TaskExecutor.
  2. If the slot release message arrives first, then the ResourceManager will attempt to acquire a new slot to fulfill the resource requirements of the job. Should the reduction arrive before this attempt completes (for example, because at the time no slot is available), then the attempt will be canceled. If the slot allocation has already started, then it is the responsibility of the JobMaster to return the slot eventually, be it by rejecting the offer immediately (based on whether it has out-standing requests / requirements) or an idle timeout.


It has to be emphasized that the ResourceManager never releases resources. In the future it may ask the JobMaster to release a slot for resource-balancing purposes, but is in the end reliant on the JobMaster's cooperation.

Lazy ExecutionGraph construction

In order to first declare the required resources before deciding on the actual parallelism, we need to create the ExecutionGraph lazily. Based on the JobGraph one can calculate the required resources for the next schedulable unit (pipelined region or set of pipelined regions) and inform the ResourceManager about it.

Next one needs to wait for the resources to arrive. Here it is important to employ a robust stabilization protocol because it can happen that some resources arrive late or that other resources disappear (e.g. because a TaskExecutor dies). In the first version we can make the assumption that the cluster has enough resources and that they will eventually be available. Once we relax this assumption (e.g. to support scheduling under constrained resources) we should take a look at Kafka’s consumer group protocol which has a similar problem to solve.

Implementation Plan

The implementation plan can also be split into two phases: 

  1. Declarative slot protocol (FLIP-138)
  2. Lazy ExecutionGraph construction (future flip)

Declarative slot protocol

The new protocol will work as a drop-in replacement for the current SlotManager/-Pool.

In the first version, the SlotPool will aggregate individual slot requests that are issued by the Scheduler into a ResourceRequirements  and announce them to the ResourceManager. Once a matching slot is returned the corresponding request future can be completed.

The new SlotManager will internally compute slot requests based on the difference between declared resource requirements, and then go similar code paths like the current version.

Lazy ExecutionGraph construction

Creating the ExecutionGraph lazily will be the bigger change. In order to guarantee stability of the system, I would suggest developing this feature by providing a different SchedulerNG implementation.

The basic idea is to split the scheduling process into the following steps:

  1. Find schedulable units of the JobGraph
  2. Calculate the required resources
  3. Declare the required resources
  4. Wait until the resources are available
  5. Instantiate the ExecutionGraph with a parallelism which can be supported by the current set of resources
  6. Match Executions to available slots
  7. Deploy Executions

Compatibility, Deprecation, and Migration Plan

Overall there should be no impact to users, and they should not even be aware of this change.

The slotmanager.request-timeout option will no longer have an effect.

Follow ups

Removing the AllocationID

Once the old SlotPool implementations are removed, it might be possible to remove the AllocationID and to identify slots via the SlotID.