This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Page tree
Skip to end of metadata
Go to start of metadata

Status

Current state: Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html

JIRA:

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently (Flink 1.9), Flink adopts a coarse grained resource management approach, where tasks are deployed into as many as the job’s max parallelism of predefined slots, regardless of how much resource each task / operator can use.

The current approach is easy to set up, but may not have optimal performance and resource utility. 

  • Tasks may have different parallelisms, thus not all of the slots contains an entire pipeline of tasks. For slots with less tasks, the slot resource predefined for an entire pipeline might be wasteful.
  • It could be hard to align slot resource with tasks requirements in all resource aspects (heap, network, managed, etc.). 

We propose fine grained resource management, which optimizes resource utility in conditions where resource requirements of individual tasks are known or can be tuned.

We propose to improve Flink’s resource management mechanism, so that:

  • It works well for both Streaming and Batch jobs.
  • It works well whether tasks’ resource requirements are specified or unknown.

This FLIP focuses on the slot allocation aspect of fine grained resource management.

Public Interfaces

  • RestAPI / WebUI (Need to adapt the RestAPI and WebUI to the dynamic slot model.)

Proposed Changes

Dynamic Slot Model


Currently (Flink 1.9), a task executor contains a fixed number of slots, whose resource are predefined with total task executor resource and number of slots per task executor. These slots share the same life span as the task executor does. Slots are initially free, and are assigned to and freed by job masters. 

With fine grained resource requirements, we may have slot requests with different resources. The current static slot approach may not achieve satisfying resource utility. In order to fulfill all the slot requests, we have to predefine the slots to have enough resources for all the large slot requests, which is obviously a waste for other small requests. 

We propose the dynamic slot model in this FLIP, to address the problem above. They key changes are as follows.

  • Slots in the same task executor can have different resources. Ideally, to improve overall resource utility, we should allocate to a task a subset of resources that exactly matches its resource requirements. Since individual tasks may have different resource needs, the slots should also have resources.
  • Dynamically create and destroy slots. Different jobs may need to partition the task executor’s resources into slots differently, depending on the particular resource requirements of tasks. Even for the same job, later tasks that trying to reuse resources released by previous finished tasks may prefer a different partition over the resources. Thus, we propose to partition a task executor’s resources dynamically, creating slots from available resources on demand, and destroying slots when they are released.


Task executors are launched with total resources but no predefined slots. When making allocation, instead of requesting a particular existing slot, the resource manager requests a slot with certain requested resources from the task executor. The task executor then create a new slot with the requested resources out of its available resources and offer the slot to the job master. As soon as the slot is released by the job master, it is destroyed and the its resources are returned back to the task executor as available resources.

Unknown Resource Requirements

Resource manager should always request slots from task executors with specified resource requirements. For slot requests with unknown resource requirements that it receives from job masters, it should allocate slots with default slot resource profiles from the task executors. 

We introduce a config option defaultSlotFraction to configure what fraction of the task executor available resource a default slot should take. For compatibility, if defaultSlotFraction is not specified, we calculate it as 1 / numOfSlot, so that by default the task executor’s resources are partitioned in the same way as in the static slot model.

Given that in standalone clusters we may have different default slot resource for different task executors, we need task executors to register their default slot resource to the resource manager on registration. The default slot resource profile should only be calculated in either startup script (standalone) or resource manager (yarn / mesos / k8s), and passed into task executors as environment variables. 

Protocol Changes

TaskExecutorGateway

Replace the requestSlot interface with a new requestResource interface.


requestSlot

requestResource

Parameters

  • SlotID
  • JobID
  • AllocationID
  • TargetAddress
  • ResourceManagerID
  • ResourceProfile
  • JobID
  • AllocationID
  • TargetAddress
  • ResourceManagerID

Return Value

Acknowledge

SlotID


SlotReport

A slot report that task executors send to the resource manager (in registration or heartbeats) now consists of two kinds of information.

  • SlotStatus of allocated slots. A SlotStatus consists of the SlotID, ResourceProfile, JobID and AllocationID. Since slots are dynamically created and destroyed, there should not be any “free slot”. Therefore, the allocation id should never be null.
  • Available Resources of the task executor currently. Because of the asynchronous communication, it is possible for a resource manager to first make an allocation and then receive a slot report with outdated available resources. Later allocations based on that outdated available resources could fail due to insufficient available resources. In these cases, TaskExecutorGateway#requestResource should thrown an exception indicating allocation failure.

Implementation Steps

Step 1. Introduce a switch for enabling dynamic slot allocation

Introduce a temporal config option as a switch between the current static / new dynamic slot allocation code paths. This allows us to implement and test the new code paths without affect the existing code paths and behaviors.

Step 2. Extend RM-TM interfaces

  • Add TaskExecutorGateway#requestResource, for dynamic allocating slot with resource profiles instead of slot ids.
  • Extend SlotReport to contain task executor available resources.
  • Extend ResourceManagerGateway#registerTaskExecutor to register task executor with default slot resource profile.

This step should not introduce any behavior changes.

Step 3. TaskExecutor derive and register with default slot resource profile

  • Introduce config option for defaultSlotFraction
  • Derive default slot resource profile from the new config option, or the legacy config option "taskmanager.numberOfTaskSlots".
  • Register task executor with the default slot resource profile.

This step should not introduce any behavior changes.

Step 4. Extend TaskExecutor to support dynamic slot allocation

  • TaskSlotTable
    • Bookkeep task manager available resources
    • Add and implement interface for dynamic allocating slot (with resource profile instead of slot index)
    • Create slot report with dynamic allocated slots and remaining available resources
  • TaskExecutor
    • Implement the interface in TaskExecutor, by calling the corresponding new interface of TaskSlotTable

Implement this step as separate code paths only for the new mode.

Step 5. Extend SlotManager to support dynamic slot allocation with specified resource profiles

  • Bookkeep task manager available resources
  • Match between slot requests and task executor resources
    • Find task executors with matching available resources for slot requests
    • Find matching pending slot requests for task executors with new available resources
  • Create TaskManagerSlot on allocation and remove on free.

Implement this step as separate code paths only for the new mode.

Step 6. Extend SlotManager to support dynamic slot allocation with unknown resource profiles

  • Record task executors' default slot resource profiles.
  • Allocate slots with unknown resource profiles according to task executors' default resource profiles.

Implement this step as separate code paths only for the new mode.

Step 7. Extend SlotManager to support dynamic slot allocation on pending task executors

  • Introduce PendingTaskManagerResources
  • Create PendingTaskManagerSlot on allocation, from PendingTaskManagerResource
  • Map registered task executors to matching PendingTaskManagerResources, and allocate slots for corresponding PendingTaskManagerSlots

Implement this step as separate code paths only for the new mode.

Step 8. Update RestAPI / Web UI

  • Update RestAPI / WebUI to properly display information of available resources and allocated slots of task executors.

Implement this step as separate code paths only for the new mode.

Step 9. Clean-up of legacy mode.

  • Fix / update / remove test cases for legacy mode
  • Remove legacy code paths
  • Remove the switch for legacy / new mode.

Compatibility, Deprecation, and Migration Plan

  • This FLIP deprecates the configuration "taskmanager.numberOfTaskSlots", but stays compatible with it.

Test Plan

  • We need to update existing and add new integration tests dedicated to validate the new fine grained resource management behaviors.
  • It is also expected that other regular integration and end-to-end tests should fail if this is broken.

Rejected Alternatives

None.

  • No labels