Airflow:如何指定资源池的定量使用?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Airflow:如何指定资源池的定量使用?相关的知识,希望对你有一定的参考价值。

我正在寻找几个开源工作流程调度程序,用于具有异构RAM使用的DAG作业。调度程序不仅应该调度少于最大线程数,还应该将所有并发任务的RAM总量保持在可用内存之下。

在这个Luigi Q&A,有人解释说

您可以设置配置中可用资源的数量,然后设置任务消耗的资源数量作为任务的属性。这将限制您一次运行该任务的n

在配置中:

[resources]
api=1

在任务代码中:

resources = {"api": 1}

对于Airflow,我无法在其文档中找到相同的功能。最好的方法是specify a number of available slots in a resource pool,并指定任务实例使用资源池中的单个插槽。但是,似乎无法指定任务实例在池中使用多个插槽。

问题:专门针对Airflow,如何指定任务实例的定量资源使用?

答案

假设您正在使用CeleryExecutor,那么从气流版本1.9.0开始,您可以管理Celery的任务并发性。这不是您一直在询问的内存管理,而是执行任务的并发工作线程数。

Tweakable参数称为CELERYD_CONCURRENCYhere非常好地解释了如何在Airflow中管理芹菜相关配置。

[编辑]

实际上,Pools也可以用来限制并发性。假设您想限制资源饥饿的task_id,以便同时只运行2个实例。你唯一需要做的是:

  • 创建池(在UI中:Admin - > Pools)为其分配名称,例如my_pool并在字段Slots中定义任务的并发性(在本例中为2
  • 当实例化将执行此Operatortask_id时,传递定义的池名称(pool=my_pool

以上是关于Airflow:如何指定资源池的定量使用?的主要内容,如果未能解决你的问题,请参考以下文章

Airflow 中文文档:保护连接

airflow并发慢

如何在 Airflow 中运行 Spark 代码?

从简单代码入手,分析线程池原理

如何使用 CloudFormation 添加用户池的资源服务器?

CDH中yarn的动态资源池的相关配置