从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业
Posted
技术标签:
【中文标题】从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业【英文标题】:Submitting Spark Job to Livy (in EMR) from Airflow (using airflow Livy operator) 【发布时间】:2021-02-09 21:19:20 【问题描述】:我正在尝试使用气流livy operator 在 EMR 中安排工作。这是我关注的example code。这里的问题是……没有指定 Livy 连接字符串(主机名和端口)。如何为运营商提供 Livy Server 主机名和端口?
此外,运算符具有参数livy_conn_id
,在示例中设置为livy_conn_default
。这是正确的值吗?...还是我设置了其他值?
【问题讨论】:
【参考方案1】:您应该在 Airflow 仪表板的“管理”选项卡中的“连接”下拥有 livy_conn_default
,如果设置正确,那么是的,您可以使用它。否则,您可以更改它或创建另一个连接 ID 并在 livy_conn_id
中使用它
【讨论】:
livy_conn_default 如何知道您的 EMR 集群的 IP 地址是什么?【参考方案2】:我们可以使用 2 个 API 来连接 Livy 和 Airflow:
-
使用 LivyBatchOperator
使用 LivyOperator
在以下示例中,我将介绍 LivyOperator API。
LivyOperator
Step1:更新 livy 配置:
登录到气流 ui --> 点击 Admin 选项卡 --> 连接 --> 搜索 livy。点击编辑按钮并更新 Host 和 Port 参数。
第二步:安装 apache-airflow-providers-apache-livy
pip install apache-airflow-providers-apache-livy
Step3:在$AIRFLOW_HOME/dags
目录下创建数据文件。
vi $AIRFLOW_HOME/dags/livy_operator_sparkpi_dag.py
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args =
'owner': 'RangaReddy',
"retries": 3,
"retry_delay": timedelta(minutes=5),
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id = "livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval='@once',
start_date = datetime(2022, 3, 2),
tags=['example', 'spark', 'livy']
)
# define livy task with LivyOperator
livy_sparkpi_submit_task = LivyOperator(
file="/root/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar",
class_name="org.apache.spark.examples.SparkPi",
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
begin_task = DummyOperator(task_id="begin_task")
end_task = DummyOperator(task_id="end_task")
begin_task >> livy_sparkpi_submit_task >> end_task
LIVY_HOST=192.168.0.1
curl http://$LIVY_HOST:8998/batches/0/log | python3 -m json.tool
输出:
"Pi is roughly 3.14144103141441"
【讨论】:
以上是关于从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业的主要内容,如果未能解决你的问题,请参考以下文章