Apache Airflow 1.9:作业结束时的数据流异常

Posted

技术标签:

【中文标题】Apache Airflow 1.9:作业结束时的数据流异常【英文标题】:Apache Airflow 1.9 : Dataflow exception at the job's end 【发布时间】:2018-08-15 07:59:18 【问题描述】:

借助 DataflowJavaOperator,我正在使用 Airflow 1.9 在 Google Cloud Platform (GCP) 上启动 Dataflow。

下面是用于从 Airflow Dag 启动数据流的代码:

df_dispatch_data = DataFlowJavaOperator(
    task_id='df-dispatch-data',  # Equivalent to JobName
    jar="/path/of/my/dataflow/jar",
    gcp_conn_id="my_connection_id",
    dataflow_default_options=
        'project': my_project_id,
        'zone': 'europe-west1-b',
        'region': 'europe-west1',
        'stagingLocation': 'gs://my-bucket/staging',
        'tempLocation': 'gs://my-bucket/temp'
    ,
    options=
        'workerMachineType': 'n1-standard-1',
        'diskSizeGb': '50',
        'numWorkers': '1',
        'maxNumWorkers': '50',
        'schemaBucket': 'schemas_needed_to_dispatch',
        'autoscalingAlgorithm': 'THROUGHPUT_BASED',
        'readQuery': 'my_query'
    
)

但是,即使由于作业成功在 GCP 上一切正常,在我的计算 Airflow 上的数据流作业结束时也会发生异常。它是由 gcp_dataflow_hook.py 抛出的:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 528, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1584, in run
    session=session)
  File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 121, in execute
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_java_dataflow
    task_id, variables, dataflow, name, ["java", "-jar"])
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in _start_dataflow
    self.get_conn(), variables['project'], name).wait_for_done()
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 31, in __init__
    self._job = self._get_job()
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 48, in _get_job
    job = self._get_job_id_from_name()
  File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 40, in _get_job_id_from_name
    for job in jobs['jobs']:
KeyError: 'jobs'

你有想法吗?

【问题讨论】:

【参考方案1】:

此问题是由用于启动数据流的选项引起的。如果 --zone 或 --region 被赋予 google API 以获取作业状态不起作用,则仅在默认区域和区域 US/us-central1 时。

【讨论】:

这是否意味着我们无法在除 US/us-central1 之外的任何地方运行 DataFlow 作业? 我设法解决了这个问题,但将 DataFlowPythonOperator 的 'poll_sleep' 参数从 10 减少到 1,因为我的 teste DataFlow 作业在间隔之前完成。

以上是关于Apache Airflow 1.9:作业结束时的数据流异常的主要内容,如果未能解决你的问题,请参考以下文章

apache Apache Airflow/Cloud Composer 中多个客户端的最佳实践?

Apache Manged Airflow EMR 操作员 DAG 失败

使用 Apache Airflow 更新和维护 postgres 表

Airflow 中文文档:教程

Airflow - BigQuery 作业状态检查失败。最终错误是:%s'

通过 UI 将参数传递给 Airflow 的作业