Apache Airflow 1.9:作业结束时的数据流异常
Posted
技术标签:
【中文标题】Apache Airflow 1.9:作业结束时的数据流异常【英文标题】:Apache Airflow 1.9 : Dataflow exception at the job's end 【发布时间】:2018-08-15 07:59:18 【问题描述】:借助 DataflowJavaOperator,我正在使用 Airflow 1.9 在 Google Cloud Platform (GCP) 上启动 Dataflow。
下面是用于从 Airflow Dag 启动数据流的代码:
df_dispatch_data = DataFlowJavaOperator(
task_id='df-dispatch-data', # Equivalent to JobName
jar="/path/of/my/dataflow/jar",
gcp_conn_id="my_connection_id",
dataflow_default_options=
'project': my_project_id,
'zone': 'europe-west1-b',
'region': 'europe-west1',
'stagingLocation': 'gs://my-bucket/staging',
'tempLocation': 'gs://my-bucket/temp'
,
options=
'workerMachineType': 'n1-standard-1',
'diskSizeGb': '50',
'numWorkers': '1',
'maxNumWorkers': '50',
'schemaBucket': 'schemas_needed_to_dispatch',
'autoscalingAlgorithm': 'THROUGHPUT_BASED',
'readQuery': 'my_query'
)
但是,即使由于作业成功在 GCP 上一切正常,在我的计算 Airflow 上的数据流作业结束时也会发生异常。它是由 gcp_dataflow_hook.py 抛出的:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 528, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1584, in run
session=session)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 121, in execute
hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_java_dataflow
task_id, variables, dataflow, name, ["java", "-jar"])
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in _start_dataflow
self.get_conn(), variables['project'], name).wait_for_done()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 31, in __init__
self._job = self._get_job()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 48, in _get_job
job = self._get_job_id_from_name()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 40, in _get_job_id_from_name
for job in jobs['jobs']:
KeyError: 'jobs'
你有想法吗?
【问题讨论】:
【参考方案1】:此问题是由用于启动数据流的选项引起的。如果 --zone 或 --region 被赋予 google API 以获取作业状态不起作用,则仅在默认区域和区域 US/us-central1 时。
【讨论】:
这是否意味着我们无法在除 US/us-central1 之外的任何地方运行 DataFlow 作业? 我设法解决了这个问题,但将 DataFlowPythonOperator 的 'poll_sleep' 参数从 10 减少到 1,因为我的 teste DataFlow 作业在间隔之前完成。以上是关于Apache Airflow 1.9:作业结束时的数据流异常的主要内容,如果未能解决你的问题,请参考以下文章
apache Apache Airflow/Cloud Composer 中多个客户端的最佳实践?
Apache Manged Airflow EMR 操作员 DAG 失败
使用 Apache Airflow 更新和维护 postgres 表