在 Airflow EMR 操作员步骤中使用 Json 输入变量
Posted
技术标签:
【中文标题】在 Airflow EMR 操作员步骤中使用 Json 输入变量【英文标题】:Using Json Input Variables In Airflow EMR Operator Steps 【发布时间】:2019-10-04 20:12:14 【问题描述】:我目前正在遵循此处给出的模板:https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py 创建一个 DAG 以使用 spark 提交调用 emr 实例。设置 spark_test_steps 时,我需要包含从 POST Json 传入的变量以填充 spark 提交,如下所示:
SPARK_TEST_STEPS = [
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep':
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
kwargs['dag_run'].conf['var_1']
kwargs['dag_run'].conf['var_2']
'10'
]
]
如何传递 POST Json 给出的变量,同时仍遵循 git 链接中给出的格式,如下所示?
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
DEFAULT_ARGS =
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
dag = DAG(
'emr_job_flow_manual_steps_dag',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 3 * * *'
)
var_1 = ''
var_2 = ''
SPARK_TEST_STEPS = []
def define_param(**kwargs):
global var_1
global var_2
global SPARK_TEST_STEPS
var_1 = str(kwargs['dag_run'].conf['var_1'])
var_2 = str(kwargs['dag_run'].conf['var_2'])
SPARK_TEST_STEPS = [
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep':
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
kwargs['dag_run'].conf['var_1']
kwargs['dag_run'].conf['var_2']
'10'
]
]
return SPARK_TEST_STEPS
DEFINE_PARAMETERS = PythonOperator(
task_id='DEFINE_PARAMETERS',
python_callable=define_param,
provide_context=True,
dag=dag)
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
aws_conn_id='aws_default',
steps=' ti.xcom_pull(task_ids="DEFINE_PARAMETERS") ',
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
step_id=" task_instance.xcom_pull('add_steps', key='return_value')[0] ",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
aws_conn_id='aws_default',
dag=dag
)
cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)
我不能使用 Variable.get 和 Variable.set,因为由于气流全局变量的不断变化,这将不允许同时对不同变量类型进行多个 dag 调用。我曾尝试使用 xcom 调用 SPARK_TEST_STEPS,但 xcom 的返回类型是字符串,并且 EmrAddStepsOperator 步骤需要一个列表。
【问题讨论】:
您如何使用xcom
s 并解决..return type of xcom is string and EmrAddStepsOperator steps requires a list..
的限制但使用json.loads(value_returned_by_xcom
)`
我已经尝试使用 json.loads 但我仍然遇到同样的错误。
【参考方案1】:
我通过创建一个在执行之前解析 json 的自定义运算符解决了类似的问题。问题的原因是当你通过steps=' ti.xcom_pull(task_ids="DEFINE_PARAMETERS") ',
。您实际上是在传递一个带有模板引擎插值的字符串,它没有被反序列化。
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
import json
class DynamicEmrStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id', 'steps']
template_ext = ()
ui_color = '#f9c915'
@apply_defaults
def __init__(
self,
job_flow_id=None,
steps="[]",
*args, **kwargs):
super().__init__(
job_flow_id = job_flow_id,
steps = steps,
*args, **kwargs)
def execute(self, context):
self.steps = json.loads(self.steps)
return super().execute(context)
【讨论】:
以上是关于在 Airflow EMR 操作员步骤中使用 Json 输入变量的主要内容,如果未能解决你的问题,请参考以下文章
从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业
AWS EMR Airflow:Postgresql 连接器
Airflow dag 中的 postgres_operator 问题