Python笔记 · Airflow中的DAG与With语法
Posted bluishglc
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python笔记 · Airflow中的DAG与With语法相关的知识,希望对你有一定的参考价值。
在《Python笔记 · With语法糖》这篇文章中我们提到:
在Airflow中通过With构建DAG时,不必显示地将Operator添加到DAG中,只要是在With语句块内声明的Operator,都会被自动添加到DAG里
本文我们来详细说明一下这件事是怎么做到的。首先,我们知道,可以配合With关键字一起使用的Class必定实现了__enter__
和__exit__
两个魔法方法,DAG也不例外,我们来看一下DAG这两个方法的实现(本文使用的是Airflow 2.1.0版本的源码):
@functools.total_ordering
class DAG(LoggingMixin):
....
def __enter__(self):
DagContext.push_context_managed_dag(self)
return self
def __exit__(self, _type, _value, _tb):
DagContext.pop_context_managed_dag()
....
两个方法都很直白,在enter时将当前DAG添加到DagContext中,在exit时再从DagContext移除。我们跟进一层,看看DagContext的这两个push和pop操作:
class DagContext:
_context_managed_dag: Optional[DAG] = None
_previous_context_managed_dags: List[DAG] = []
@classmethod
def push_context_managed_dag(cls, dag: DAG):
if cls._context_managed_dag:
cls._previous_context_managed_dags.append(cls._context_managed_dag)
cls._context_managed_dag = dag
@classmethod
def pop_context_managed_dag(cls) -> Optional[DAG]:
old_dag = cls._context_managed_dag
if cls._previous_context_managed_dags:
cls._context_managed_dag = cls._previous_context_managed_dags.pop()
else:
cls._context_managed_dag = None
return old_dag
@classmethod
def get_current_dag(cls) -> Optional[DAG]:
return cls._context_managed_dag
从代码可知:DagContext会维护一个“当前DAG的引用”和“此前N个DAG的引用列表”,注意:它们都是静态全局变量。push操作会先检查一下当前上下文中是否已经存在一个DAG,如果有,得先将其压入历史DAG列表中,然后把传入的DAG(就是由with引入的当前DAG)设置为当前DAG。pop操作将与之相反,它会将历史DAG列表中的栈顶元素弹出,并设为当前DAG。(注:之所以历史DAG使用的是一个列表,显然是考虑到了DAG嵌套的问题)
综合以上两段代码不难看出:Airflow维护一个全局变量(DagContext)来keep当前的DAG,以便**“在需要的时候”不需传递参数即可在上下文中获取当前DAG的引用**。那么这个“需要的时候”是什么时候呢?答案就在BaseOperator
中:
class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta):
def __init__(
....
):
....
self.dag = dag or DagContext.get_current_dag()
....
@dag.setter
def dag(self, dag: Any):
"""
Operators can be assigned to one DAG, one time. Repeat assignments to
that same DAG are ok.
"""
from airflow.models.dag import DAG
if dag is None:
self._dag = None
return
if not isinstance(dag, DAG):
raise TypeError(f'Expected DAG; received dag.__class__.__name__')
elif self.has_dag() and self.dag is not dag:
raise AirflowException(f"The DAG assigned to self can not be changed.")
elif self.task_id not in dag.task_dict:
dag.add_task(self)
elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self:
dag.add_task(self)
self._dag = dag
每一个Operator都会继承BaseOperator,在BaseOperator的初始化中可以看到:每一个Operator在初始化时都会从全局变量(DagContext)中取得当前上下文中的DAG作为自己隶属的DAG,而在set dag的方法中会调用DAG的add_task方法将自己(Operator)添加到DAG中,从而完成了Operator(Task)和DAG的双向关联!
以上就是在Airflow中使用With创建DAG的时无需显示将Task添加到DAG中的原因,使用With创建TaskGroup也是同样的道理!
以上是关于Python笔记 · Airflow中的DAG与With语法的主要内容,如果未能解决你的问题,请参考以下文章