Google Cloud DataFlow 作业尚不可用.. 在 Airflow

Posted

技术标签:

【中文标题】Google Cloud DataFlow 作业尚不可用.. 在 Airflow【英文标题】:Google Cloud DataFlow job not available yet.. in Airflow 【发布时间】:2019-03-07 13:08:26 【问题描述】:

我正在从 Airflow 运行数据流作业。我需要说我是 Airflow 的新手。数据流(从 Airflow 运行)运行成功,但我可以看到 Airflow 在获取作业状态方面存在一些问题,并且我收到无限消息,例如:

Google Cloud DataFlow 作业尚不可用..

以下是向数据流添加所有步骤后的日志(我将 projectID 和 jobID 放在原来的位置):

[2018-10-01 13:00:13,987] logging_mixin.py:95 INFO - [2018-10-01 13:00:13,987] gcp_dataflow_hook.py:128 WARNING - b'INFO: Staging pipeline description to gs://my-project/staging'

[2018-10-01 13:00:13,987] logging_mixin.py:95 INFO - [2018-10-01 13:00:13,987] gcp_dataflow_hook.py:128 WARNING - b'Oct 01, 2018 1:00:13 PM org.apache.beam.runners.dataflow.DataflowRunner run'

[2018-10-01 13:00:13,988] logging_mixin.py:95 INFO - [2018-10-01 13:00:13,988] gcp_dataflow_hook.py:128 WARNING - b'INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-10-01_06_00_12-jobID?project=projectID'

[2018-10-01 13:00:13,988] logging_mixin.py:95 INFO - [2018-10-01 13:00:13,988] gcp_dataflow_hook.py:128 WARNING - b'Oct 01, 2018 1:00:13 PM org.apache.beam.runners.dataflow.DataflowRunner run'

[2018-10-01 13:00:13,988] logging_mixin.py:95 INFO - [2018-10-01 13:00:13,988] gcp_dataflow_hook.py:128 WARNING - b"INFO: To cancel the job using the 'gcloud' tool, run:"

[2018-10-01 13:00:13,988] logging_mixin.py:95 INFO - [2018-10-01 13:00:13,988] gcp_dataflow_hook.py:128 WARNING - b'> gcloud dataflow jobs --project=projectID cancel --region=us-central1 2018-10-01_06_00_12-jobID'

[2018-10-01 13:00:13,990] logging_mixin.py:95 INFO - [2018-10-01 13:00:13,990] discovery.py:267 INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataflow/v1b3/rest

[2018-10-01 13:00:14,417] logging_mixin.py:95 INFO - [2018-10-01 13:00:14,417] discovery.py:866 INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/projectID/locations/us-central1/jobs?alt=json

[2018-10-01 13:00:14,593] logging_mixin.py:95 INFO - [2018-10-01 13:00:14,593] gcp_dataflow_hook.py:77 INFO - Google Cloud DataFlow job not available yet..

[2018-10-01 13:00:29,614] logging_mixin.py:95 INFO - [2018-10-01 13:00:29,614] discovery.py:866 INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/projectID/locations/us-central1/jobs?alt=json

[2018-10-01 13:00:29,772] logging_mixin.py:95 INFO - [2018-10-01 13:00:29,772] gcp_dataflow_hook.py:77 INFO - Google Cloud DataFlow job not available yet..

[2018-10-01 13:00:44,790] logging_mixin.py:95 INFO - [2018-10-01 13:00:44,790] discovery.py:866 INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/projectID/locations/us-central1/jobs?alt=json

[2018-10-01 13:00:44,937] logging_mixin.py:95 INFO - [2018-10-01 13:00:44,937] gcp_dataflow_hook.py:77 INFO - Google Cloud DataFlow job not available yet..

你知道是什么原因造成的吗?我找不到与此问题相关的任何解决方案。 我应该提供更多信息吗?

这是我在 DAG 中的任务:

# dataflow task
dataflow_t=DataFlowJavaOperator(
task_id='mydataflow',
jar='/lib/dataflow_test.jar',
gcp_conn_id='my_gcp_conn',
delegate_to='service_account@projectID.iam.gserviceaccount.com',
dag=dag)

以及在 default_args 中连接到 DAG 中数据流的选项:

'dataflow_default_options': 
     'project': 'projectID',
     'stagingLocation': 'gs://my-project/staging'
    

【问题讨论】:

您是否在数据流运行程序文件中创建作业名称?如果可以共享 dataflow run.py 文件,调试会更容易。 我在 java 的 DataflowPipelineOptions 中指定了 jobName。喜欢: [options.setJobName("mydataflow");] 我也应该在 DAG 中指定它吗?我添加了如何在问题中指定任务 我遇到了同样的问题。我在 DataflowPipelineOptions 中创建了作业名称。 Airflow 还会根据您提供的任务 ID 创建作业名称。因此存在冲突,并且 Airflow 无法找到您通过 DataflowPipelineOptions 创建的实际作业名称。您应该从 DataflowPipelineOptions 中删除作业名称,它将起作用。 我确实看到气流正在创建它的 jobName,因为命令是:--jobName=mydataflow-53afd323 但在 GCP 上(从 DataflowPipelineOptions 删除作业名称后)我可以看到它的名称为“dataflowhelper -airflow-1004032710-72d76cab”,我还应该在 DAG 中提供一些 job_name 或 jobName 吗? 只需在任务中传递task_id即可。 【参考方案1】:

我遇到了同样的问题。我在 DataflowPipelineOptions 中创建了作业名称。 Airflow 还会根据您提供的任务 ID 创建作业名称。

So there is conflict and airflow is not able to find the actual job name which 
you created via DataflowPipelineOptions.

您只需从 DataflowPipelineOptions 中删除作业名称即可。

【讨论】:

我试过了,还是不行,还有什么建议吗?

以上是关于Google Cloud DataFlow 作业尚不可用.. 在 Airflow的主要内容,如果未能解决你的问题,请参考以下文章

在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名

到 Cloud Bigtable 的 Google Cloud Dataflow 管道中的异常

Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE”的“WriteToBigQuery/BigQuerySink”将 jso

Google Cloud Dataflow 服务帐户未传播给工作人员?

Google Cloud DataFlow 随机化 WritetoBigQuery

通过 Google Cloud Dataflow 创建/写入 Parititoned BigQuery 表