airflow实战系列 基于 python 的调度和监控工作流的平台

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了airflow实战系列 基于 python 的调度和监控工作流的平台相关的知识,希望对你有一定的参考价值。

简介

airflow 是一个使用python语言编写的data pipeline调度和监控工作流的平台。Airflow被Airbnb内部用来创建、监控和调整数据管道。任何工作流都可以在这个使用Python来编写的平台上运行。

Airflow是一种允许工作流开发人员轻松创建、维护和周期性地调度运行工作流(即有向无环图或成为DAGs)的工具。在Airbnb中,这些工作流包括了如数据存储、增长分析、Email发送、A/B测试等等这些跨越多部门的用例。

这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres和S3交互的能力,并且提供了钩子使得系统拥有很好地扩展性。除了一个命令行界面,该工具还提供了一个基于Web的用户界面让您可以可视化管道的依赖关系、监控进度、触发任务等。

传统Workflow通常使用TextFiles(json,xml/etc)来定义DAG,然后Scheduler解析这些DAG文件形成具体的TaskObject执行;Airflow没这么干,它直接用Python写DAGdefinition,一下子突破了文本文件表达能力的局限,定义DAG变得简单。

Airflow的架构

在一个可扩展的生产环境中,Airflow含有以下组件:

  • 一个元数据库(MySQL或Postgres)

  • 一组Airflow工作节点

  • 一个调节器(Redis或RabbitMQ)

  • 一个Airflow Web服务器

所有这些组件可以在一个机器上随意扩展运行。如果使用LocalExcuter来适度的安装则可以获得相当多的额外性能。

技术分享

技术分享

优点

  • python脚本实现DAG,非常容易扩展

  • 工作流依赖可视化

  • no XML

  • 可测试

  • 可作为crontab的替代

  • 可实现复杂的依赖规则

  • Pools

  • CLI和Web UI

功能简介

技术分享

常见命令

  • initdb,初始化元数据DB,元数据包括了DAG本身的信息、运行信息等;

  • resetdb,清空元数据DB;

  • list_dags,列出所有DAG;

  • list_tasks,列出某DAG的所有task;

  • test,测试某task的运行状况;

  • backfill,测试某DAG在设定的日期区间的运行状况;

  • webserver,开启webserver服务;

  • scheduler,用于监控与触发DAG。

ETL

ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库。

Airflow设计时,只是为了很好的处理ETL任务而已,但是其精良的设计,正好可以用来解决任务的各种依赖问题。

任务依赖

通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。比如:

  • 时间依赖:任务需要等待某一个时间点触发

  • 外部系统依赖:任务依赖Mysql中的数据,HDFS中的数据等等,这些不同的外部系统需要调用接口去访问

  • 机器依赖:任务的执行只能在特定的某一台机器的环境中,可能这台机器内存比较大,也可能只有那台机器上有特殊的库文件

  • 任务间依赖:任务A需要在任务B完成后启动,两个任务互相间会产生影响

  • 资源依赖:任务消耗资源非常多,使用同一个资源的任务需要被限制,比如跑个数据转换任务要10个G,机器一共就30个G,最多只能跑两个,我希望类似的任务排个队

  • 权限依赖:某种任务只能由某个权限的用户启动

也许大家会觉得这些是在任务程序中的逻辑需要处理的部分,但是我认为,这些逻辑可以抽象为任务控制逻辑的部分,和实际任务执行逻辑解耦合。

如何理解Crontab

现在让我们来看下最常用的依赖管理系统,Crontab

在各种系统中,总有些定时任务需要处理,每当在这个时候,我们第一个想到的总是crontab。

确实,crontab可以很好的处理定时执行任务的需求,但是对于crontab来说,执行任务,只是调用一个程序如此简单,而程序中的各种逻辑都不属于crontab的管辖范围(很好的遵循了KISS)

所以我们可以抽象的认为:

crontab是一种依赖管理系统,而且只管理时间上的依赖。

Airflow的处理依赖的方式

Airflow的核心概念,是DAG(有向无环图),DAG由一个或多个TASK组成,而这个DAG正是解决了上文所说的任务间依赖。Task A 执行完成后才能执行 Task B,多个Task之间的依赖关系可以很好的用DAG表示完善

Airflow完整的支持crontab表达式,也支持直接使用python的datatime表述时间,还可以用datatime的delta表述时间差。这样可以解决任务的时间依赖问题。

Airflow在CeleryExecuter下可以使用不同的用户启动Worker,不同的Worker监听不同的Queue,这样可以解决用户权限依赖问题。Worker也可以启动在多个不同的机器上,解决机器依赖的问题。

Airflow可以为任意一个Task指定一个抽象的Pool,每个Pool可以指定一个Slot数。每当一个Task启动时,就占用一个Slot,当Slot数占满时,其余的任务就处于等待状态。这样就解决了资源依赖问题。

Airflow中有Hook机制(其实我觉得不应该叫Hook),作用时建立一个与外部数据系统之间的连接,比如Mysql,HDFS,本地文件系统(文件系统也被认为是外部系统)等,通过拓展Hook能够接入任意的外部系统的接口进行连接,这样就解决的外部系统依赖问题。

参考

http://wingerted.com/2017/02/20/introduce-to-airflow/

https://www.youtube.com/watch?v=cHATHSB_450

https://www.youtube.com/watch?v=Pr0FrvIIfTU

 

此文已由作者授权腾讯云技术社区发布,转载请注明文章出处

 

 

以上是关于airflow实战系列 基于 python 的调度和监控工作流的平台的主要内容,如果未能解决你的问题,请参考以下文章

使用AirFlow调度MaxCompute

大数据调度平台Airflow:什么是Airflow

airflow+k8s 多用户-分布式-跨集群-容器化调度

大数据调度平台Airflow:Airflow单机搭建

Apache Airflow单机/分布式环境搭建

大数据调度平台Airflow:Airflow使用