Discussion thread
Vote thread
JIRA

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

With widespread advances in machine learning (or deep learning), more and more enterprises are beginning to incorporate ML models across a number of products. Supporting the ML scenarios is one of Flink’s roadmap targets. GPU is widely used as the accelerator by people from the ML community. It is necessary to add GPU support. 

Currently, Flink only supports to request GPU resource in Mesos integration while most users and enterprises deploying Flink on Yarn/Kubernetes or Standalone mode. Thus, we propose to add GPU support in Flink. As a first step, we propose to:

Public Interfaces

Introduce the external resource framework for external resource allocation and management. The pattern of configuration options is:

On the TaskExecutor side, introduce the ExternalResourceDriverFactory and ExternalResourceDriver interface, take the responsibility to manage and provide information of the external resource. User could implement their third party ExternalResourceDriver for other external resources they want to leverage.

public interface ExternalResourceDriverFactory {
    /**
    * Construct the ExternalResourceDriver from configuration.
    */
    ExternalResourceDriver createExternalResourceDriver(Congiuration config);
}

public interface ExternalResourceDriver {
    /**
    * Retrieve the information of the external resources according to the amount.
    */
    Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount);
}

Introduce the ExternalResourceInfo class, which contains the information of the external resources.

public interface ExternalResourceInfo {
}

Operators and functions could get that information from the RuntimeContext.

public interface RuntimeContext {
    /**
	 * Get the specific external resource information by the resourceName.
	 */
	Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName);
}

For GPU resource, we introduce the following configuration options:

We provide a GPUDriver which discovers the GPU resource through user-defined discovery script and provide the available GPU index.

For GPU resource, we introduce the GPUInforamtion class, which only contains the index of a GPU card.

Proposed Changes

Overview

Introduce external resource framework for external resource allocation and management

To provide extensibility and decouple the TaskExecutor/ResourceManager from the external resource management/allocation(following the separation of concern rule), we introduce the external resource framework for external resource allocation and management. This framework could be extended by third-party for other external resources they want to leverage.

The external resource framework drives the end-to-end workflow of external resource allocation and management. All enabled external resources should be added to "external-resource.list".

On the ResourceManager side, user defines the amount of the external resource. The framework takes the responsibility to allocate resources from external resource managers(Yarn/Kubernetes). User needs to specify the configuration key of that external resource on Yarn/Kubernetes. Then, Yarn/KubernetesResourceManager forward this external resource request to the external resource managers.

On the TaskExecutor side, we introduce ExternalResourceDriver, which takes the responsibility to detect and provide information of external resources. TaskExecutor does not need to manage a specific external resource by itself, Operators and functions would get the ExternalResourceInfo from RuntimeConext.

Regarding the configuration, the common config keys are the amount of the external resources and the class name of ExternalResourceDriver. Besides, each driver could define their own configs following the specific pattern. In summary:

The definition of ExternalResourceDriver and ExternalResourceInfo is:


public interface ExternalResourceDriverFactory {
    /**
    * Construct the ExternalResourceDriver from configuration.
    */
    ExternalResourceDriver createExternalResourceDriver(Congiuration config);
}

public interface ExternalResourceDriver {
    /**
    * Retrieve the information of the external resources according to the amount.
    */
    Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount);
}

public interface ExternalResourceInfo {
    String getProperty(String key);
    Collection<String> getKeys();
}

Guarantee the required GPU resources are accessible to task executors

On the ResourceManager side, Flink requires the environment in which task executors run has required GPU resources and the GPU resources are accessible to task executors.

Regarding the amount of GPU resources:

Regarding the accessibility of the GPU resources:


Note: To make GPU resources accessible, certain setups/preparation are needed depending on your environment. See External Requirements Section.

Introduce GPUDriver

Once the required GPU resources are accessible to task executors, GPUDriver needs to discover GPU resources and provide the GPU resource information to operators.

GPUDriver will execute the discovery script and get the available GPU resources from the output. Then, as one of the ExternalResourceDriver, operators and functions could get the GPU information from RuntimeContext.

The GPUInformation contains the specific index of the GPU.

Discovery Script

We introduce the configuration option “external-resource.gpu.param.discovery-script” and “external-resource.gpu.param.discovery-script.args”, which define the path of discovery script and its arguments. GPUDriver will execute the allocate function and get the available GPU resources from the output when it is opened and execute the deallocateAll function when it is closed. 

When executed, the discovery script should:

We provide a default script:

For achieving worker-level isolation in standalone mode, we provide a privilege option for the default script. User needs to add "--privilege" to the “external-resource.gpu.param.discovery-script.args” to turn it on. For more discussion of worker-level isolation, See Worker-level Isolation section.

User can also provide their own discovery scripts, addressing their custom requirements, e.g., dynamically deciding the amount of returned GPU indexes, etc.

GPU resource isolation

Unlike CPU, GPU has two dimensions resource: cores and video memory. While the cores could be shared by multiple jobs, video memory should be exclusive. Sharing the same GPU across multiple operators may result in OOMs and job failures if the total video memory limit is exceeded. Thus, we need isolation to make sure video memory usage does not exceed the physical limitation.

Worker-level Isolation

For Yarn/Kubernetes mode, the underlying system will guarantee that the visible GPU resources on the container should be strictly matching the requirement. Thus, worker-level isolation could be achieved without extra work.

For standalone mode, multiple task executors may be co-located on the same device, and each GPU is visible to all the task executors. To achieve worker-level isolation in such scenarios, we need decide which task executor uses which GPU in a cooperative way. We provide a privilege mode for this in the default script.

Privilege Mode of Default Discovery Script

In the privilege mode, we use a common assignment file to synchronize the GPU assignment across different task executors. After retrieving indexes of all visible GPUs, the script should open the assignment file, check which GPUs are already in use, and write down which GPUs it decides to use (choosing from the non-used ones) with the PID of task executor. We leverage the “flock” mechanism to guarantee accesses to the assignment file are exclusive. Since only one process can access the assignment file at the same time, we ensure no GPU will be used by multiple workers.

When task executor stopped, the record may not be clean. In this scenario, new task executors start after this will read the dirty data. That may cause task executor mistakenly reports there is no enough GPU resource. To address this issue, the script provides a “--check-dead” option. If it is added to in “external-resource.gpu.param.discovery-script.args”, in case of no enough non-recorded GPU, the allocate function will check whether the associated processes of exist records are still alive, and take over those GPUs whose associated processes are already dead. 


For example, if user want to trigger privilege mode, they could set “external-resource.gpu.param.discovery-script.args” to "--privilege --check-dead --assign-file /tmp/flink-assign". This will execute the default discovery script in privilege mode, check if there is dead process occupy the GPU resources and locate the resource assignment file in "/tmp/flink-assign".

Note: This approach could not ensure the GPUs are isolated from other applications (another Flink cluster or non-flink applications).

Operator-level Isolation

We do not guarantee operator-level isolation in the first step. All the operators on the same task executor can see all the visible GPU indexes.

The approach for operator-level isolation depends on fine-grained resource management, which is not ready at the moment. We may revisit this once the fine-grained resource management approach is completed. An alternative solution to this problem is to have operators work cooperatively to share all GPU resources, but this is out of the scope of this proposal.

External Requirements

We list the external requirements for using GPU resources:

Implementation Steps

Known Limitations

Test Plan

Future work

Reference

[1] Hadoop 3.1.0 – Using GPU On YARN

[2] NVIDIA/k8s-device-plugin: NVIDIA device plugin for Kubernetes

[3] Device Plugins

[4] FLIP-56: Dynamic Slot Allocation - Apache Flink

[5] cgroups(7) - Linux manual page