气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作
Posted
技术标签:
【中文标题】气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作【英文标题】:Airflow DAG EMR EmrCreateJobFlowOperator Doesn't do anythong 【发布时间】:2019-02-04 14:21:59 【问题描述】:我正在尝试运行创建 EMR 集群的 Airflow dag,它添加了一些步骤、检查它们并最终终止创建的 EMR 集群。 但是当我运行 Airflow Dag 时,它一直处于运行状态,并且没有显示任何错误或日志。
谁能告诉我我在这里做错了什么? 我应该添加任何缺少的参数吗? 还是dag时间表的问题?
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import
EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import
EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import
EmrTerminateJobFlowOperator
DEFAULT_ARGS =
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
HIVE_CLOUDFRONT = [
'Name': 'cloudfront',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep':
'Jar': 'command-runner.jar',
'Args': [
'hive-script',
'--run-hive-script',
'--args',
'-f',
's3://BUCKET/xnder/scripts/Hive_CloudFront.q',
'-d',
'INPUT=s3://BUCKET/',
'-d',
'OUTPUT=s3://BUCKET/output5/'
]
]
JOB_FLOW_OVERRIDES =
'Name' : 'test1212',
'LogUri' : 's3://BUCKET/log.txt',
'ReleaseLabel' : 'emr-4.1.0',
'Instances' :
'InstanceGroups': [
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm1.large',
'InstanceCount': 1,
,
'Name': 'Slave nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm1.large',
'InstanceCount': 1,
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
,
'Applications':[
'Name': 'Hadoop'
],
'JobFlowRole':'EMR_EC2_DefaultRole',
'ServiceRole':'EMR_DefaultRole'
dag = DAG(
'emr_test_manual',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
#schedule_interval='0 3 * * *'
#schedule_interval=timedelta(seconds=10)
schedule_interval='@once'
)
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
aws_conn_id='aws_default',
steps=HIVE_CLOUDFRONT,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
step_id=" task_instance.xcom_pull('add_steps', key='return_value')[0] ",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
aws_conn_id='aws_default',
dag=dag
)
cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)
【问题讨论】:
我们需要更多信息来帮助您。这一次, start_date 应该是固定的,而不是动态的。如果单击第一个任务实例,状态是什么?在“任务实例”页面上,还有可能会阻止任务的信息。 我解决了这个问题。问题在于服务器而不是代码。但现在我收到一个新错误 - “指定的工作流程 ID 无效” 我很难找到一些有效的 EMR 流程示例 - 这很棒,让我启动并运行 - 谢谢。 只是在玩弄这个工具。很高兴听到它对您有所帮助!不客气! 【参考方案1】:删除字段“市场”:“ON_DEMAND”
【讨论】:
以上是关于气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作的主要内容,如果未能解决你的问题,请参考以下文章