气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作

Posted

技术标签:

【中文标题】气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作【英文标题】:Airflow DAG EMR EmrCreateJobFlowOperator Doesn't do anythong 【发布时间】:2019-02-04 14:21:59 【问题描述】:

我正在尝试运行创建 EMR 集群的 Airflow dag,它添加了一些步骤、检查它们并最终终止创建的 EMR 集群。 但是当我运行 Airflow Dag 时,它一直处于运行状态,并且没有显示任何错误或日志。

谁能告诉我我在这里做错了什么? 我应该添加任何缺少的参数吗? 还是dag时间表的问题?

import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import 
EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import 
EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import 
EmrTerminateJobFlowOperator

DEFAULT_ARGS = 
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False



HIVE_CLOUDFRONT = [

    'Name': 'cloudfront',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': 
        'Jar': 'command-runner.jar',
        'Args': [
            'hive-script',
             '--run-hive-script',
             '--args',
             '-f', 
             's3://BUCKET/xnder/scripts/Hive_CloudFront.q',
              '-d',
                            'INPUT=s3://BUCKET/',
             '-d',
                            'OUTPUT=s3://BUCKET/output5/'
        ]
    

]

JOB_FLOW_OVERRIDES = 
'Name' : 'test1212',
'LogUri' : 's3://BUCKET/log.txt',
'ReleaseLabel' : 'emr-4.1.0',
'Instances' : 
  'InstanceGroups': [
        
            'Name': 'Master nodes',
            'Market': 'ON_DEMAND',
            'InstanceRole': 'MASTER',
            'InstanceType': 'm1.large',
            'InstanceCount': 1,
        ,
        
            'Name': 'Slave nodes',
            'Market': 'ON_DEMAND',
            'InstanceRole': 'CORE',
            'InstanceType': 'm1.large',
            'InstanceCount': 1,
        
    ],
    'KeepJobFlowAliveWhenNoSteps': True,
    'TerminationProtected': False
,
'Applications':[ 
    'Name': 'Hadoop'
 ],
'JobFlowRole':'EMR_EC2_DefaultRole',
'ServiceRole':'EMR_DefaultRole'


dag = DAG(
'emr_test_manual',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
#schedule_interval='0 3 * * *'
#schedule_interval=timedelta(seconds=10)
schedule_interval='@once'
)

cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)

step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
aws_conn_id='aws_default',
steps=HIVE_CLOUDFRONT,
dag=dag
)

step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
step_id=" task_instance.xcom_pull('add_steps', key='return_value')[0] ",
aws_conn_id='aws_default',
dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=" task_instance.xcom_pull('create_job_flow', key='return_value') ",
aws_conn_id='aws_default',
dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

【问题讨论】:

我们需要更多信息来帮助您。这一次, start_date 应该是固定的,而不是动态的。如果单击第一个任务实例,状态是什么?在“任务实例”页面上,还有可能会阻止任务的信息。 我解决了这个问题。问题在于服务器而不是代码。但现在我收到一个新错误 - “指定的工作流程 ID 无效” 我很难找到一些有效的 EMR 流程示例 - 这很棒,让我启动并运行 - 谢谢。 只是在玩弄这个工具。很高兴听到它对您有所帮助!不客气! 【参考方案1】:

删除字段“市场”:“ON_DEMAND”

【讨论】:

以上是关于气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作的主要内容,如果未能解决你的问题,请参考以下文章

Apache Manged Airflow EMR 操作员 DAG 失败

气流 - Pytest - 未找到夹具“dag”

在气流上部署 dag 文件的有效方法

气流 DAG 步骤依赖项

气流:如何删除 DAG?

从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业