Python笔记 · Airflow中的DAG与With语法

Posted bluishglc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python笔记 · Airflow中的DAG与With语法相关的知识,希望对你有一定的参考价值。

《Python笔记 · With语法糖》这篇文章中我们提到:

在Airflow中通过With构建DAG时,不必显示地将Operator添加到DAG中,只要是在With语句块内声明的Operator,都会被自动添加到DAG里

本文我们来详细说明一下这件事是怎么做到的。首先,我们知道,可以配合With关键字一起使用的Class必定实现了__enter____exit__两个魔法方法,DAG也不例外,我们来看一下DAG这两个方法的实现(本文使用的是Airflow 2.1.0版本的源码):

@functools.total_ordering
class DAG(LoggingMixin):
	....
	def __enter__(self):
	    DagContext.push_context_managed_dag(self)
	    return self
	
	def __exit__(self, _type, _value, _tb):
	    DagContext.pop_context_managed_dag()
	....

两个方法都很直白,在enter时将当前DAG添加到DagContext中,在exit时再从DagContext移除。我们跟进一层,看看DagContext的这两个push和pop操作:

class DagContext:
    _context_managed_dag: Optional[DAG] = None
    _previous_context_managed_dags: List[DAG] = []
    
    @classmethod
    def push_context_managed_dag(cls, dag: DAG):
        if cls._context_managed_dag:
            cls._previous_context_managed_dags.append(cls._context_managed_dag)
        cls._context_managed_dag = dag
    
    @classmethod
    def pop_context_managed_dag(cls) -> Optional[DAG]:
        old_dag = cls._context_managed_dag
        if cls._previous_context_managed_dags:
            cls._context_managed_dag = cls._previous_context_managed_dags.pop()
        else:
            cls._context_managed_dag = None
        return old_dag
        
    @classmethod
    def get_current_dag(cls) -> Optional[DAG]:
        return cls._context_managed_dag

从代码可知:DagContext会维护一个“当前DAG的引用”和“此前N个DAG的引用列表”,注意:它们都是静态全局变量。push操作会先检查一下当前上下文中是否已经存在一个DAG,如果有,得先将其压入历史DAG列表中,然后把传入的DAG(就是由with引入的当前DAG)设置为当前DAG。pop操作将与之相反,它会将历史DAG列表中的栈顶元素弹出,并设为当前DAG。(注:之所以历史DAG使用的是一个列表,显然是考虑到了DAG嵌套的问题)

综合以上两段代码不难看出:Airflow维护一个全局变量(DagContext)来keep当前的DAG,以便**“在需要的时候”不需传递参数即可在上下文中获取当前DAG的引用**。那么这个“需要的时候”是什么时候呢?答案就在BaseOperator中:

class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta):
    
    def __init__(
    ....
    ):
        ....
        self.dag = dag or DagContext.get_current_dag()
        ....
    
    @dag.setter
    def dag(self, dag: Any):
        """
        Operators can be assigned to one DAG, one time. Repeat assignments to
        that same DAG are ok.
        """
        from airflow.models.dag import DAG

        if dag is None:
            self._dag = None
            return
        if not isinstance(dag, DAG):
            raise TypeError(f'Expected DAG; received dag.__class__.__name__')
        elif self.has_dag() and self.dag is not dag:
            raise AirflowException(f"The DAG assigned to self can not be changed.")
        elif self.task_id not in dag.task_dict:
            dag.add_task(self)
        elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self:
            dag.add_task(self)

        self._dag = dag

每一个Operator都会继承BaseOperator,在BaseOperator的初始化中可以看到:每一个Operator在初始化时都会从全局变量(DagContext)中取得当前上下文中的DAG作为自己隶属的DAG,而在set dag的方法中会调用DAG的add_task方法将自己(Operator)添加到DAG中,从而完成了Operator(Task)和DAG的双向关联!

以上就是在Airflow中使用With创建DAG的时无需显示将Task添加到DAG中的原因,使用With创建TaskGroup也是同样的道理!

以上是关于Python笔记 · Airflow中的DAG与With语法的主要内容,如果未能解决你的问题,请参考以下文章

Apache Airflow Celery Executor。导入一个本地自定义的python包

大数据调度平台Airflow:Airflow使用

如何删除气流中的默认示例 dag

如何与客户运营商验证气流 DAG?

Apache Airflow单机/分布式环境搭建

Airflow dag 中的 postgres_operator 问题