如何控制 Airflow 安装的并行性或并发性?

Posted

技术标签:

【中文标题】如何控制 Airflow 安装的并行性或并发性?【英文标题】:How to control the parallelism or concurrency of an Airflow installation? 【发布时间】:2019-10-15 15:58:00 【问题描述】:

在我的一些 Apache Airflow 安装中,即使调度程序似乎没有完全加载,计划运行的 DAG 或任务也不会运行。如何增加可以同时运行的 DAG 或任务的数量?

同样,如果我的安装处于高负载状态,并且我想限制我的 Airflow 工作人员拉取排队任务的速度(例如减少资源消耗),我可以调整什么来减少平均负载?

【问题讨论】:

【参考方案1】:

这是自 Airflow v1.10.2 以来可用的配置选项的扩展列表。有些可以在每个 DAG 或每个操作员的基础上设置,但如果未指定,也可能会回退到设置范围的默认值。


可以在每个 DAG 基础上指定的选项

concurrency:允许在此设置的 DAG 的所有活动运行中同时运行的任务实例数。如果未设置,则默认为 core.dag_concurrency max_active_runs:此 DAG 的最大活动运行数。一旦达到此限制,调度程序将不会创建新的活动 DAG 运行。如果未设置,则默认为 core.max_active_runs_per_dag

例子:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)

可以在每个操作员的基础上指定的选项

pool:执行任务的池。Pools 可用于限制仅一个子集任务的并行度 task_concurrency:同一任务跨多个 DAG 运行的并发限制

例子:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)

在整个 Airflow 设置中指定的选项

core.parallelism:在整个 Airflow 安装中运行的最大任务数 core.dag_concurrency:每个 DAG 可以运行的最大任务数(跨多个 DAG 运行core.non_pooled_task_slot_count: 分配给不在池中运行的任务的任务槽数 core.max_active_runs_per_dag:每个 DAG 运行的最大活动 DAG 数 scheduler.max_threads:调度程序进程应该使用多少线程来调度 DAG celery.worker_concurrency:工作人员一次处理的最大任务实例数如果使用 CeleryExecutor celery.sync_parallelism: CeleryExecutor 用于同步任务状态的进程数

【讨论】:

很好的答案,非常感谢!所有与并发相关的选项的清晰说明,集中在一处。 如果你看下面的评论和插图,我认为task_concurrency 的定义不正确。定义应该是: > 同一任务跨多次执行的任务运行的并发限制。 @PhilippJohannis 谢谢!我已经编辑了答案。【参考方案2】:

三个主要并发控制变量的说明:

从气流版本 2.2 开始,task_concurrency 参数已被 max_active_tis_per_dag 弃用。

https://airflow.apache.org/docs/stable/faq.html#how-can-my-airflow-dag-run-faster

【讨论】:

【参考方案3】:

检查使用 core.executor 的气流配置。 SequentialExecutor 将按顺序执行,因此您可以选择并行执行任务的 Local Executor 或 Clery Executor。 之后,您可以使用@hexacyanide 提到的其他选项

【讨论】:

以上是关于如何控制 Airflow 安装的并行性或并发性?的主要内容,如果未能解决你的问题,请参考以下文章

UIPageViewController 可访问性或画外音

如何限制 kotlin 协程的最大并发性

k8s集群Job负载支持多个Pod可靠并发,如何权衡利弊选择适合的并行计算模式?

GPU 中的并行性 - CUDA / OpenCL

并发编程专题-并发与多线程

Golang 可见性或 CPU 线程缓存问题