airflow实战系列 基于 python 的调度和监控工作流的平台
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了airflow实战系列 基于 python 的调度和监控工作流的平台相关的知识,希望对你有一定的参考价值。
简介
airflow 是一个使用python语言编写的data pipeline调度和监控工作流的平台。Airflow被Airbnb内部用来创建、监控和调整数据管道。任何工作流都可以在这个使用Python来编写的平台上运行。
Airflow是一种允许工作流开发人员轻松创建、维护和周期性地调度运行工作流(即有向无环图或成为DAGs)的工具。在Airbnb中,这些工作流包括了如数据存储、增长分析、Email发送、A/B测试等等这些跨越多部门的用例。
这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres和S3交互的能力,并且提供了钩子使得系统拥有很好地扩展性。除了一个命令行界面,该工具还提供了一个基于Web的用户界面让您可以可视化管道的依赖关系、监控进度、触发任务等。
传统Workflow通常使用TextFiles(json,xml/etc)来定义DAG,然后Scheduler解析这些DAG文件形成具体的TaskObject执行;Airflow没这么干,它直接用Python写DAGdefinition,一下子突破了文本文件表达能力的局限,定义DAG变得简单。
Airflow的架构
在一个可扩展的生产环境中,Airflow含有以下组件:
-
一个元数据库(MySQL或Postgres)
-
一组Airflow工作节点
-
一个调节器(Redis或RabbitMQ)
-
一个Airflow Web服务器
所有这些组件可以在一个机器上随意扩展运行。如果使用LocalExcuter来适度的安装则可以获得相当多的额外性能。
优点
-
python脚本实现DAG,非常容易扩展
-
工作流依赖可视化
-
no XML
-
可测试
-
可作为crontab的替代
-
可实现复杂的依赖规则
-
Pools
-
CLI和Web UI
功能简介
常见命令
-
initdb,初始化元数据DB,元数据包括了DAG本身的信息、运行信息等;
-
resetdb,清空元数据DB;
-
list_dags
,列出所有DAG; -
list_tasks
,列出某DAG的所有task; -
test,测试某task的运行状况;
-
backfill,测试某DAG在设定的日期区间的运行状况;
-
webserver,开启webserver服务;
-
scheduler,用于监控与触发DAG。
ETL
ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库。
Airflow设计时,只是为了很好的处理ETL任务而已,但是其精良的设计,正好可以用来解决任务的各种依赖问题。
任务依赖
通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。比如:
-
时间依赖:任务需要等待某一个时间点触发
-
外部系统依赖:任务依赖Mysql中的数据,HDFS中的数据等等,这些不同的外部系统需要调用接口去访问
-
机器依赖:任务的执行只能在特定的某一台机器的环境中,可能这台机器内存比较大,也可能只有那台机器上有特殊的库文件
-
任务间依赖:任务A需要在任务B完成后启动,两个任务互相间会产生影响
-
资源依赖:任务消耗资源非常多,使用同一个资源的任务需要被限制,比如跑个数据转换任务要10个G,机器一共就30个G,最多只能跑两个,我希望类似的任务排个队
-
权限依赖:某种任务只能由某个权限的用户启动
也许大家会觉得这些是在任务程序中的逻辑需要处理的部分,但是我认为,这些逻辑可以抽象为任务控制逻辑的部分,和实际任务执行逻辑解耦合。
如何理解Crontab
现在让我们来看下最常用的依赖管理系统,Crontab
在各种系统中,总有些定时任务需要处理,每当在这个时候,我们第一个想到的总是crontab。
确实,crontab可以很好的处理定时执行任务的需求,但是对于crontab来说,执行任务,只是调用一个程序如此简单,而程序中的各种逻辑都不属于crontab的管辖范围(很好的遵循了KISS)
所以我们可以抽象的认为:
crontab是一种依赖管理系统,而且只管理时间上的依赖。
Airflow的处理依赖的方式
Airflow的核心概念,是DAG(有向无环图),DAG由一个或多个TASK组成,而这个DAG正是解决了上文所说的任务间依赖。Task A 执行完成后才能执行 Task B,多个Task之间的依赖关系可以很好的用DAG表示完善
Airflow完整的支持crontab表达式,也支持直接使用python的datatime表述时间,还可以用datatime的delta表述时间差。这样可以解决任务的时间依赖问题。
Airflow在CeleryExecuter下可以使用不同的用户启动Worker,不同的Worker监听不同的Queue,这样可以解决用户权限依赖问题。Worker也可以启动在多个不同的机器上,解决机器依赖的问题。
Airflow可以为任意一个Task指定一个抽象的Pool,每个Pool可以指定一个Slot数。每当一个Task启动时,就占用一个Slot,当Slot数占满时,其余的任务就处于等待状态。这样就解决了资源依赖问题。
Airflow中有Hook机制(其实我觉得不应该叫Hook),作用时建立一个与外部数据系统之间的连接,比如Mysql,HDFS,本地文件系统(文件系统也被认为是外部系统)等,通过拓展Hook能够接入任意的外部系统的接口进行连接,这样就解决的外部系统依赖问题。
参考
http://wingerted.com/2017/02/20/introduce-to-airflow/
https://www.youtube.com/watch?v=cHATHSB_450
https://www.youtube.com/watch?v=Pr0FrvIIfTU
此文已由作者授权腾讯云技术社区发布,转载请注明文章出处
以上是关于airflow实战系列 基于 python 的调度和监控工作流的平台的主要内容,如果未能解决你的问题,请参考以下文章