从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业

Posted

技术标签:

【中文标题】从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业【英文标题】:Submitting Spark Job to Livy (in EMR) from Airflow (using airflow Livy operator) 【发布时间】:2021-02-09 21:19:20 【问题描述】:

我正在尝试使用气流livy operator 在 EMR 中安排工作。这是我关注的example code。这里的问题是……没有指定 Livy 连接字符串(主机名和端口)。如何为运营商提供 Livy Server 主机名和端口?

此外,运算符具有参数livy_conn_id,在示例中设置为livy_conn_default。这是正确的值吗?...还是我设置了其他值?

【问题讨论】:

【参考方案1】:

您应该在 Airflow 仪表板的“管理”选项卡中的“连接”下拥有 livy_conn_default,如果设置正确,那么是的,您可以使用它。否则,您可以更改它或创建另一个连接 ID 并在 livy_conn_id 中使用它

【讨论】:

livy_conn_default 如何知道您的 EMR 集群的 IP 地址是什么?【参考方案2】:

我们可以使用 2 个 API 来连接 Livy 和 Airflow:

    使用 LivyBatchOperator 使用 LivyOperator

在以下示例中,我将介绍 LivyOperator API。

LivyOperator

Step1:更新 livy 配置:

登录到气流 ui --> 点击 Admin 选项卡 --> 连接 --> 搜索 livy。点击编辑按钮并更新 HostPort 参数。

第二步:安装 apache-airflow-providers-apache-livy

pip install apache-airflow-providers-apache-livy

Step3:在$AIRFLOW_HOME/dags目录下创建数据文件。

vi $AIRFLOW_HOME/dags/livy_operator_sparkpi_dag.py

from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = 
    'owner': 'RangaReddy',
    "retries": 3,
    "retry_delay": timedelta(minutes=5),


# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id = "livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval='@once',
    start_date = datetime(2022, 3, 2),
    tags=['example', 'spark', 'livy']
)

# define livy task with LivyOperator
livy_sparkpi_submit_task = LivyOperator(
    file="/root/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)

begin_task = DummyOperator(task_id="begin_task")
end_task = DummyOperator(task_id="end_task")

begin_task >> livy_sparkpi_submit_task >> end_task
LIVY_HOST=192.168.0.1
curl http://$LIVY_HOST:8998/batches/0/log | python3 -m json.tool

输出:

"Pi is roughly 3.14144103141441"

【讨论】:

以上是关于从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业的主要内容,如果未能解决你的问题,请参考以下文章

Airflow 中文文档:使用Mesos扩展(社区贡献)

气流操作员从外部Rest API提取数据

气流 - 试图循环操作员。执行不是等待实际操作完成

使用气流进行实时工作编排

(Django)气流中的 ORM - 有可能吗?

气流.providers 和气流.contrib 之间的差异