airflow集成EMR使用

Posted zackstang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了airflow集成EMR使用相关的知识,希望对你有一定的参考价值。

1. 准备工作

1.1. 安装并初始化airflow,参考以下文档:

https://www.cnblogs.com/zackstang/p/11082322.html

 

其中还要额外安装的是:

sudo pip-3.6 install -i https://pypi.tuna.tsinghua.edu.cn/simple ‘apache-airflow[celery]‘

sudo pip-3.6 install -i https://pypi.tuna.tsinghua.edu.cn/simple boto3

 

1.2. 配置好本地AWS Credentials,此credential需有启动EMR 的权限。

 

1.3. 置数据库为外部数据库:

编辑 airflow.cfg 文件,修改数据库连接配置(需提前在数据库中创建好airflowdb 的数据库):

sql_alchemy_conn = mysql://user:password@database_location/airflowdb

 

使用下面的命令检查并初始化:

airflow initdb

 

1.4. 配置executor 为 CeleryExecutor

编辑airflow.cfg 文件,修改executor配置:

executor = CeleryExecutor

 

修改后可以保证相互无依赖的任务可以并行执行。默认为SequentialExecutor,也就是按顺序执行。

 

1.5 配置broker_url 与 result_backend

airflow.cfg 文件中修改以下两个条目:

broker_url = sqla+mysql:// user:password@database_location:3306/airflowdb

result_backend = db+mysql:// user:password@database_location:3306/airflowdb

 

配置完后启动airflow 的web ui,worker,flower以及scheduler:

airflow webserver -p 8080 &

airflow worker &

airflow flower &

airflow scheduler &

 

2. 定义工作流

创建dag_trasform.py 文件,在文件中定义工作流

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

default_args = {
    owner: Airflow,
    depends_on_past: False,
    start_date: datetime.now().replace(microsecond=0),
    email: [‘xxxxxx@qq.com],
    email_on_failure: False,
    email_on_retry: False,
    retries: 0,
    retry_delay: timedelta(minutes=5),
    # ‘queue‘: ‘bash_queue‘,
    # ‘pool‘: ‘backfill‘,
    # ‘priority_weight‘: 10,
    # ‘end_date‘: datetime(2016, 1, 1),
}

dag = DAG(dag_transform, default_args=default_args,
          schedule_interval=timedelta(days=1))

# create emr cluster
t0 = BashOperator(
    task_id=create_emr_cluster,
    bash_command=python3 /home/hadoop/scripts/launch_emr.py,
    dag=dag)

# do wordcount
t1 = BashOperator(
    task_id=spark_job,
    bash_command=python3 /home/hadoop/scripts/submit_spark_job.py,
    dag=dag)

# check result in s3
t2 = BashOperator(
    task_id=check_s3,
    bash_command=python3 /home/hadoop/scripts/check_s3_result.py,
    dag=dag)

# hive query
t3 = BashOperator(
    task_id=query,
    bash_command=python3 /home/hadoop/scripts/query_result.py,
    dag=dag)

# terminate cluster
t4 = BashOperator(
    task_id=terminate_cluster,
    bash_command=python3 /home/hadoop/scripts/terminate_cluster.py,
    dag=dag)

# define airflow DAG
t0 >> t1
t1 >> t2
t2 >> t3
t3 >> t4

 

其中各个BashOperator中的脚本需自行实现,根据需求实现即可。

 

3. 重制Airflow数据库

将 dag_transform.py 文件放入 airflow/dags/ 下,然后重置 airflow 数据库:airflow resetdb

 

4. 运行

在airflow里手动执行这个DAG,可以看到这个DAG已经开始运行:

 技术图片

 

查看 dag_transform 可以看到已经在运行启动emr的脚本了

[[2020-03-12 12:42:54,197] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmptwdg7a_6/create_emr_clusterlbzuu36e
[2020-03-12 12:42:54,197] {bash_operator.py:115} INFO - Running command: python3 /home/hadoop/scripts/launch_emr.py

可以看到 EMR 集群正在启动:

 技术图片

 

t1 spark wordcount 开始执行:

技术图片

 

t2 完成后,t3 hive query 开始执行:

技术图片

 

最后,整个DAG执行完毕:

技术图片

 

我们也可以看到EMR集群开始自动终止:

技术图片

 

参考文档:

https://aws.amazon.com/cn/blogs/china/dynamic-start-stop-of-emr-cluster-with-airflow-and-remote-submission-of-tasks-via-livy/

以上是关于airflow集成EMR使用的主要内容,如果未能解决你的问题,请参考以下文章

在 Airflow EMR 操作员步骤中使用 Json 输入变量

从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业

AWS EMR Airflow:Postgresql 连接器

AWS EMR 上的持续集成

使用 Airflow 进行集成测试

Airflow 中文文档:保护连接