FlinkFlink 扩展资源框架
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 扩展资源框架相关的知识,希望对你有一定的参考价值。
1.概述
近年来,随着人工智能领域的不断发展,深度学习模型已经被应用到了各种各样的生产需求中,比较典型的场景如推荐系统,广告推送,智能风险控制。这些也是 Flink 一直以来被广泛使用的场景,因此,支持人工智能一直以来都是 Flink 社区的长远目标之一。针对这个目标,目前已经有了很多第三方的开源扩展工作。由阿里巴巴开源的工作主要有两个:
- 一个是 Flink AI Extended 的项目,是基于 Flink 的深度学习扩展框架,目前支持 TensorFlow、PyTorch 等框架的集成,它使用户可以将 TensorFlow 当做一个算子,放在 Flink 任务中。
- 另一个是 Alink,它是一个基于 Flink 的通用算法平台,里面也内置了很多常用的机器学习算法。
以上的两个工作都是从功能性上对 Flink 进行扩展,然而从算力的角度上讲,深度学习模型亦或机器学习算法,通常都是整个任务的计算瓶颈所在。GPU 则是这个领域被广泛使用用来加速训练或者预测的资源。因此,支持 GPU 资源来加速计算是 Flink 在 AI 领域的发展过程中必不可少的功能。
2.使用扩展资源
目前 Flink 支持用户配置的资源维度只有 CPU 与内存,而在实际使用中,不仅是 GPU,我们还会遇到其他资源需求,如 SSD 或 RDMA 等网络加速设备。因此,我们希望提供一个通用的扩展资源框架,任何扩展资源都可以以插件的形式来加入这个框架,GPU 只是其中的一种扩展资源。
对于扩展资源的使用,可以抽象出两个通用需求:
-
需要支持该类扩展资源的配置与调度。用户可以在配置中指明对这类扩展资源的需求,如每个 TaskManager 上需要有一块 GPU 卡,并且当 Flink 被部署在 Kubernetes/Yarn 这类资源底座上时,需要将用户对扩展资源的需求进行转发,以保证申请到的 Container/Pod 中存在对应的扩展资源。
-
需要向算子提供运行时的扩展资源信息。用户在自定义算子中可能需要一些运行时的信息才能使用扩展资源,以 GPU 为例,算子需要知道它内部的模型可以部署在那一块 GPU 卡上,因此,需要向算子提供这些信息。
3.扩展资源框架使用方法
使用资源框架我们可以分为以下这 3 个步骤:
- 首先为该扩展资源设置相关配置;
- 然后为所需的扩展资源准备扩展资源框架中的插件;
- 最后在算子中,从 RuntimeContext 来获取扩展资源的信息并使用这些资源
3.1 配置参数
# 定义扩展资源名称,“gpu”
external-resources: gpu
# 定义每个 TaskManager 所需的 GPU 数量
external-resource.gpu.amount: 1
# 定义Yarn或Kubernetes中扩展资源的配置键
external-resource.gpu.yarn.config-key: yarn.io/gpu
external-resource.gpu.kubernetes.config-key: nvidia.com/gpu
# 定义插件 GPUDriver 的工厂类。
external-resource.gpu.driver-factory.class:
org.apache.flink.externalresource.gpu.GPUDriverFactory
以上是使用 GPU 资源的配置示例:
- 对于任何扩展资源,用户首先需要将它的名称加入 “external-resources” 中,这个名称也会被用作该扩展资源其他相关配置的前缀来使用。示例中,我们定义了一种名为 “gpu” 的资源。
- 在调度层,目前支持用户在 TaskManager 的粒度来配置扩展资源需求。示例中,我们定义每个 TaskManager 上的 GPU 设备数为 1。
- 将 Flink 部署在 Kubernetes 或是 Yarn 上时,我们需要配置扩展资源在对应的资源底座上的配置键,以便 Flink 对资源需求进行转发。示例中展示了 GPU 对应的配置。
- 如果提供了插件,则需要将插件的工厂类名放入配置中。
3.2 前置准备
在实际使用扩展资源前,还需要做一些前置准备工作,以 GPU 为例:
-
在 Standalone 模式下,集群管理员需要保证 GPU 资源对 TaskManager 进程可见。
-
在 Kubernetes 模式下,需要集群支持 Device Plugin[6],对应的 Kubernetes 版本为 1.10,并且在集群中安装了 GPU 对应的插件。
-
在 Yarn 模式下,GPU 调度需要集群 Hadoop 版本在 2.10 或 3.1 以上,并正确配置了 resource-types.xml 等文件。
3.3 扩展资源框架插件
完成了对扩展资源的调度后,用户自定义算子可能还需要运行时扩展资源的信息才能使用它。扩展资源框架中的插件负责完成该信息的获取,它的接口如下:
public interface ExternalResourceDriverFactory
/**
* 根据提供的设置创建扩展资源的Driver
*/
ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
public interface ExternalResourceDriver
/**
* 获取所需数量的扩展资源信息
*/
Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
ExternalResourceDriver 会在各个 TaskManager 上启动,扩展资源框架会调用各个 Driver 的 retrieveResourceInfo 接口来获得 TaskManager 上的扩展资源信息,并将得到的信息传到算子的 RuntimeContext。ExternalResourceDriverFactory 则为插件的工厂类。
4. GPU 插件
Flink 目前内置了针对 GPU 资源的插件,其内部通过执行名为 Discovery Script 的脚本来获取当前环境可用的 GPU 信息,目前信息中包含了 GPU 设备的 Index。
Flink 提供了一个默认脚本,位于项目的 “plugins/external-resource-gpu/” 目录,用户也可以实现自定义的 Discovery Script 并通过配置来指定使用自定义脚本。该脚本与 GPU 插件的协议为:
-
当调用脚本时,所需要的 GPU 数量将作为第一个参数输入,之后为用户自定义参数列表。
-
若脚本执行正常,则输出 GPU Index 列表,以逗号分隔。
-
若脚本出错或执行结果不符合预期,则脚本以非零值退出,这会导致 TaskManager 初始化失败,并在日志中打印脚本的错误信息。
Flink 提供的默认脚本是通过 “nvidia-smi” 工具来获取当前的机器中可用的 GPU 数量以及 index,并根据所需要的 GPU 数量返回对应数量的 GPU Index 列表。当无法获取到所需数量的 GPU 时,脚本将以非零值退出。
GPU 设备的资源分为两个维度,流处理器与显存,其显存资源只支持独占使用。因此,当多个 TaskManager 运行在同一台机器上时,若一块 GPU 被多个进程使用,可能导致其显存 OOM。因此,Standalone 模式下,需要 TaskManager 级别的资源隔离机制。
默认脚本提供了 Coordination Mode 来支持单机中多个 TaskManager 进程之间的 GPU 资源隔离。该模式通过使用文件锁来实现多进程间 GPU 使用信息同步,协调同一台机器上多个 TaskManager 进程对 GPU 资源的使用。
5. 在算子中获取扩展资源信息
在用户自定义算子中,可使用在 “external-resources” 中定义的资源名称来调用 RuntimeContext 的 getExternalResourceInfos
接口获取对应扩展资源的信息。以 GPU 为例,得到的每个 ExternalResourceInfo 代表一块 GPU 卡,而其中包含名为 “index” 的字段代表该 GPU 卡的设备 Index。
public class ExternalResourceMapFunction extends RichMapFunction<String, String>
private static finalRESOURCE_NAME="gpu";
@Override
public String map(String value)
Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
List<String> indexes = gpuInfos.stream()
.map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList());
// Map function with GPU// ...
6. MNIST Demo
下图以 MNIST 数据集的识别任务来演示使用 GPU 加速 Flink 作业。
MNIST 如上图所示,为手写数字图片数据集,每个图片可表示为为 28*28 的矩阵。在该任务中,我们使用预训练好的 DNN 模型,图片输入经过一层全连接网络得到一个 10 维向量,该向量最大元素的下标即为识别结果。
我们在一台拥有两块 GPU 卡的 ECS 上启动一个有两个 TaskManager 进程的 Standalone 集群。借助默认脚本提供的 Coordination Mode 功能,我们可以保证每个 TaskManager 各使用其中一块 GPU 卡。
该任务的核心算子为图像识别函数 MNISTClassifier,核心实现如下所示
class MNISTClassifier extends RichMapFunction<List<Float>, Integer>
@Override
public void open(Configuration parameters)
//获取GPU信息并且选择第一块GPU
Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName);
final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index");
// 使用第一块GPU的index初始化JCUDA组件
JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));
JCublas.cublasInit();
在 Open 方法中,从 RuntimeContext 获取当前 TaskManager 可用的 GPU,并选择第一块来初始化 JCuda 以及 JCublas 库。
class MNISTClassifier extends RichMapFunction<List<Float>, Integer>
@Override
public Integer map(List<Float> value)
// 使用Jucblas做矩阵算法
JCublas.cublasSgemv('n', DIMENSIONS.f1, DIMENSIONS.f0, 1.0f,
matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1);
// 获得乘法结果并得出该图所表示的数字
JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1);
JCublas.cublasFree(inputPointer);
JCublas.cublasFree(outputPointer);
int result = 0;
for (int i = 0; i < DIMENSIONS.f1; ++i)
result = output[i] > output[result] ? i : result;
return result;
在 Map 方法中,将预先训练好的模型参数与输入矩阵放入 GPU 显存,使用 JCublas 进行 GPU 中的矩阵乘法运算,最后将结果向量从 GPU 显存中取出并得到识别结果数字。
具体案例演示流程可以前往观看视频或者参考 Github 上面的链接动手尝试。
M.扩展
Accelerating your workload with GPU and other external resources
FLIP-108: Add GPU support in Flink
以上是关于FlinkFlink 扩展资源框架的主要内容,如果未能解决你的问题,请参考以下文章