Airflow简单介绍及测试安装

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Airflow简单介绍及测试安装相关的知识,希望对你有一定的参考价值。

参考技术A

Airflow主要用于执行预定的批处理作业。它能够很好地管理不同的批量作业的关系,并给将复杂的关系图形化展示。

一句话:任何批量任务或者需要手动去触发执行的任务都可以考虑一下Airflow

airlow使用ansible role方式安装 https://github.com/idealista/airflow-role

更改role下的config.yml及templates/airflow-***.service.j2文件
将 /usr/local/bin/airflow 改为 /usr/bin/airflow

目的该role使用pkg只能在Debian 或 Ubuntu 环境下安装,更改后,可以在centos环境下安装

执行完以上操作后本机就安装好了airflow,同时也配置好了service等。

安装好后展示

Airflow中文文档
任务调度神器 airflow 之初体验
airflow 安装,部署,填坑
airflow 配置 CeleryExecutor
浅谈调度工具——Airflow
如何部署一个健壮的 apache-airflow 调度系统
Airflow工作模式及适用场景
浅谈调度工具——Airflow

调度系统Airflow1.10.4调研与介绍和docker安装

Airflow1.10.4介绍与安装

现在是9102年,8月中旬。airflow当前版本是1.10.4.

随着公司调度任务增大,原有的,基于crontab和mysql的任务调度方案已经不太合适了,需要寻找一个可以支持分布式扩容的调度系统解决方案。

最初瞄准azkaban来着,想着基于这个的二次开发。对比功能和社区热度之后,Airflow比较符合我们寻找的调度系统。

什么是Airflow

Airflow是一个以编程方式创作,安排和监控工作流程的平台。对比crontab来看,它是一个可以定时调度任务的系统,只不过,airflow的调度更容易管理。

  • airflow支持任务依赖pipeline, 这是crontab以及quartz所不支持的。
  • airflow调度系统和业务系统解耦。业务单独编写流程,支持任务热加载。
  • airflow支持crontab定时格式
  • airflow通过Python来定义task,可以实现复杂的逻辑,支持分支条件等
  • airflow有一套完整的UI和管理系统
  • airflow有强大的插件扩展方式,各种插件很丰富,很容易二次开发,添加新功能
  • airflow是分布式设计,支持水平扩容
  • airflow支持task实例,并支持数据业务日期bizdate, 也叫 execution_date.
  • airflow支持任务补录backfill
  • airflow支持任务之间数据传递(这个任务依赖于上个任务的变量)
  • airflow支持序列执行(这个周期的任务依赖于上一个周期的执行结果是否成功)

Airflow 于 2014 年 10 月由 Airbnb 的 Maxime Beauchemin 开始。它是第一次提交的开源,并在 2015 年 6 月宣布正式加入 Airbnb Github。

该项目于 2016 年 3 月加入了 Apache Software Foundation 的孵化计划。

关于airflow具体使用细节,后面再详细介绍,这里就是一些在调度系统选型过程中查找的资料。

阿里基于airflow二次开发了调度平台Maat:

有赞基于airflow二次开发了大数据任务调度平台:

Google cloud提供了基于airflow的数据分析产品:
技术图片

微软Azure支持airflow的运行:
技术图片

当然,这些云厂商很可能是为了让你使用他们的数据产品,比如对象存储,lambda等。

社区异常活跃,star破万,更新频繁, Apache背书。据说作者早期在Facebook搞过一套调度系统,到airbnb就开源了airflow。大公司背书。

技术图片

slack群组也很活跃
技术图片

虽然是Python开发的,我也没玩过Python web, 但调研结果就是: 用Airflow吧。

Airflow的安装

官方文档有非常详细的安装教程。这里不再赘述。

想要记录的是基于docker安装airflow,以及做了一些特定的修改。

最终docker镜像为: https://github.com/Ryan-Miao/docker-airflow

使用方式很简单:

  1. clone 项目
  2. 构建airflow镜像
make build
  1. 启动
docker-compose -f docker-compose-CeleryExecutor.yml up -d
  1. 浏览器访问localhost:8089可以查看dag

技术图片

  1. 浏览器访问localhost:5555可以查看worker

扩容3个worker

docker-compose -f docker-compose-CeleryExecutor.yml scale worker=3

技术图片

所做的一些修改

修改时区为utc+8

Docker容器的时区

ENV LANGUAGE zh_CN.UTF-8
ENV LANG zh_CN.UTF-8
ENV LC_ALL zh_CN.UTF-8
ENV LC_CTYPE zh_CN.UTF-8
ENV LC_MESSAGES zh_CN.UTF-8

sed -i 's/^# zh_CN.UTF-8 UTF-8$/zh_CN.UTF-8 UTF-8/g' /etc/locale.gen     && locale-gen 
/bin/cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone

web server ui显示的时区,以及任务运行时的ds等时区:

参考https://blog.csdn.net/Crazy__Hope/article/details/83688986,

airflow.cfg

default_timezone = Asia/Shanghai

/usr/local/lib/python3.7/site-packages/airflow/utils/timezone.py

在 utc = pendulum.timezone(‘UTC’) 这行(第27行)代码下添加,

from airflow import configuration as conf
try:
    tz = conf.get("core", "default_timezone")
    if tz == "system":
        utc = pendulum.local_timezone()
    else:
        utc = pendulum.timezone(tz)
except Exception:
    pass

修改utcnow()函数 (在第69行)

原代码 d = dt.datetime.utcnow() 
修改为 d = dt.datetime.now()

/usr/local/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py

在utc = pendulum.timezone(‘UTC’) 这行(第37行)代码下添加

from airflow import configuration as conf
try:
    tz = conf.get("core", "default_timezone")
    if tz == "system":
        utc = pendulum.local_timezone()
    else:
        utc = pendulum.timezone(tz)
except Exception:
    pass

/usr/local/lib/python3.7/site-packages/airflow/www/templates/admin/master.html

把代码 var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000); 
改为 var UTCseconds = x.getTime();

把代码 "timeFormat":"H:i:s %UTC%",
改为  "timeFormat":"H:i:s",

webserver查看日志,中文乱码问题

容器编码设置没问题,进去看日志文件也没问题,但是webserver查看的时候日志中文乱码。原因是http请求的mime
没设置编码。

/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py
修改mime

mimetype="application/json;charset=utf-8",

Hive beeline认证

airflow支持beeline, 在connection里填写beeline的配置后,使用HiveOperator进行hive操作。我们的hive
没有使用kerberos, 而是ldap的账号密码认证。需要对后台的hvie任务做认证的修改。

修改hive_hooks.py的认证部分即可。Dockerfile注释掉的部分就是。

添加hive的支持

github的airflow docker没有hive相关的lib。我在Dockerfile里添加了hive的环境,这个后面再做优化,针对
不同的pool,安装不同的依赖。

ldap配置

参见https://www.cnblogs.com/woshimrf/p/ldap.html 配置我们的ldap服务。

然后修改airflow.cfg. 找到263行

authenticate = False
# 设置为True并打开ldap即可使用ldap配置
# auth_backend = airflow.contrib.auth.backends.ldap_auth

以及518行

[ldap]
# set this to ldaps://<your.ldap.server>:<port>
uri = ldap://192.168.2.2:389
user_filter = objectClass=inetOrgPerson
user_name_attr = sn
group_member_attr = memberOf
superuser_filter = memberOf=cn=g-admin,ou=group,dc=demo,dc=com
data_profiler_filter = memberOf=cn=g-users,ou=group,dc=demo,dc=com
bind_user = cn=admin,dc=demo,dc=com
bind_password = admin
basedn = dc=demo,dc=com
cacert = 
search_scope = SUBTREE

参考

以上是关于Airflow简单介绍及测试安装的主要内容,如果未能解决你的问题,请参考以下文章

gtest单元测试框架介绍及简单使用

大数据调度平台Airflow:Airflow分布式集群搭建及测试

[AirFlow]AirFlow使用指南三 第一个DAG示例

接口及接口测试简单介绍

调度系统Airflow1.10.4调研与介绍和docker安装

RabbitMQ简单介绍及安装使用