Airflow GCS 到 BQuery 操作员无法识别云存储桶 URI

Posted

技术标签:

【中文标题】Airflow GCS 到 BQuery 操作员无法识别云存储桶 URI【英文标题】:Airflow GCS to BQuery operator not able to recognize the Cloud storage bucket URI 【发布时间】:2018-03-12 05:46:45 【问题描述】:

我有以下气流代码,它基本上将表从 mysql DB 读取到 Google Cloud Storage,然后到 Google Big Query。

我已在气流的管理选项卡中包含连接详细信息以及 MySQL 和 GCP 的服务帐户。

extract = MySqlToGoogleCloudStorageOperator(
    task_id='extract_actors',
    mysql_conn_id='factset_test',
    google_cloud_storage_conn_id='gcp_test',
    sql='SELECT * FROM mysql.time_zone',
    bucket='airflow_1',
    filename='actors/actors.json',
    schema_filename='schemas/actors.json',
    dag=dag)

load = GoogleCloudStorageToBigQueryOperator(
            task_id="load_bq_time_zone",
            bigquery_conn_id='gcp_test',
            google_cloud_storage_conn_id='gcp_test',
            bucket='airflow_1',
            destination_project_dataset_table="airflow.mysql_time_zone",
            source_objects='actors/actors0.json',
            schema_object='schemas/actors.json',
            source_format='NEWLINE_DELIMITED_JSON',
            create_disposition='CREATE_IF_NEEDED',
            write_disposition='WRITE_TRUNCATE',
            dag=dag)

我看到 MySql 数据库中的表 time_zone 被复制到云存储桶气流_1。但是当气流试图将数据从云存储复制到 BigQuery 时,它抱怨它无法找到云存储桶。 以下是日志详细信息:

[2018-03-12 02:16:59,031] models.py:167 INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:16:59,601] base_task_runner.py:112 INFO - Running: ['bash', '-c', u'airflow run mysql_to_gcs load_bq_time_zone 2018-03-12T02:16:48.974591 --job_id 465 --raw -sd DAGS_FOLDER/mysql_to_gcs.py']
[2018-03-12 02:16:59,822] base_task_runner.py:95 INFO - Subtask: /usr/local/lib/python2.7/dist-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-03-12 02:16:59,822] base_task_runner.py:95 INFO - Subtask:   """)
[2018-03-12 02:16:59,858] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:16:59,857] __init__.py:57 INFO - Using executor SequentialExecutor
[2018-03-12 02:16:59,949] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:16:59,949] driver.py:120 INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-03-12 02:16:59,973] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:16:59,973] driver.py:120 INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-03-12 02:17:00,157] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,157] models.py:167 INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:17:00,712] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,712] models.py:1126 INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,720] models.py:1126 INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,721] models.py:1318 INFO - 
[2018-03-12 02:17:00,721] base_task_runner.py:95 INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] base_task_runner.py:95 INFO - Subtask: Starting attempt 1 of 2
[2018-03-12 02:17:00,722] base_task_runner.py:95 INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] base_task_runner.py:95 INFO - Subtask: 
[2018-03-12 02:17:00,738] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,737] models.py:1342 INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): load_bq_time_zone> on 2018-03-12 02:16:48.974591
[2018-03-12 02:17:00,792] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,792] gcp_api_base_hook.py:81 INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:00,795] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,795] __init__.py:44 WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,795] base_task_runner.py:95 INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:00,795] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask:     from . import file_cache
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask:     'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,795] discovery.py:274 INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,795] transport.py:157 INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:00,838] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,838] client.py:777 INFO - Refreshing access_token
[2018-03-12 02:17:00,910] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,909] discovery.py:868 INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/airflow_1/o/%2Fschemas%2Factors.json?alt=media
[2018-03-12 02:17:01,209] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,208] gcp_api_base_hook.py:81 INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:01,210] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,210] __init__.py:44 WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,210] base_task_runner.py:95 INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask:     from . import file_cache
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask:     'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,210] discovery.py:274 INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2018-03-12 02:17:01,212] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,210] transport.py:157 INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:01,248] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,248] client.py:777 INFO - Refreshing access_token
[2018-03-12 02:17:01,325] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,325] bigquery_hook.py:961 INFO - project not included in destination_project_dataset_table: airflow.mysql_time_zone; using project "bigquery-1210"
[2018-03-12 02:17:01,339] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,339] discovery.py:868 INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs?alt=json
[2018-03-12 02:17:01,888] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,887] discovery.py:868 INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L?alt=json
[2018-03-12 02:17:02,064] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:02,062] models.py:1417 ERROR - BigQuery job failed. Final error was: u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'. The job was: u'status': u'state': u'DONE', u'errors': [u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'], u'errorResult': u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t', u'kind': u'bigquery#job', u'statistics': u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658', u'jobReference': u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': u'load': u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow', u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': u'fields': [u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED', u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'], u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask:     result = task_copy.execute(context=context)
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/gcs_to_bq.py", line 153, in execute
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask:     schema_update_options=self.schema_update_options)
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 476, in run_load
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask:     return self.run_with_configuration(configuration)
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 513, in run_with_configuration
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask:     job['status']['errorResult'], job
[2018-03-12 02:17:02,069] base_task_runner.py:95 INFO - Subtask: Exception: BigQuery job failed. Final error was: u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'. The job was: u'status': u'state': u'DONE', u'errors': [u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'], u'errorResult': u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t', u'kind': u'bigquery#job', u'statistics': u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658', u'jobReference': u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': u'load': u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow', u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': u'fields': [u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED', u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'], u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'
[2018-03-12 02:17:02,069] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:02,068] models.py:1433 INFO - Marking task as UP_FOR_RETRY
[2018-03-12 02:17:02,087] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:02,085] models.py:1462 ERROR - BigQuery job failed. Final error was: u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'. The job was: u'status': u'state': u'DONE', u'errors': [u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'], u'errorResult': u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t', u'kind': u'bigquery#job', u'statistics': u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658', u'jobReference': u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': u'load': u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow', u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': u'fields': [u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED', u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'], u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'

【问题讨论】:

我在日志中找不到存储桶的问题。不过,我可以看到某种身份验证错误,在这里更具体:***.com/questions/40154672/… 如果您看到日志的最后一行。错误 - BigQuery 作业失败。最终错误是:u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'。 另外,当 Airflow 在将 MySql 表写入云存储时没有抱怨身份验证错误时,我不确定它为什么在将数据从云存储读取到 Bigquery 时抱怨。 【参考方案1】:

问题是source_objects='actors/actors0.json'

需要source_objects=['actors/actors0.json']

来自文档:

:param source_objects: List of Google cloud storage URIs to load from. (templated)
    If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
:type object: list

这是代码:

source_uris = ['gs:///'.format(self.bucket, source_object)
                   for source_object in self.source_objects]

所以它会遍历它。

【讨论】:

true,source_objects 是列表的类型,这解决了我的问题,我被困了几个小时。 source_objects=['my_file.csv']【参考方案2】:

source_objects 需要提供一个列表,即source_objects=['actors/actors0.json'],因为一次可以加载多个文件。

始终最好检查运算符代码中的字段定义,例如 https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/contrib/operators/gcs_to_bq.py#L65

【讨论】:

【参考方案3】:

查看报错信息,问题是你指定的bucket:airflow_1不存在。其实我在我的项目中尝试过用这个名字创建一个bucket,这个名字是可用的(一个bucket名字must be globally unique across Cloud Storage)。

另外,请注意 BigQuery does not support source URIs 在初始双斜杠之后包含多个连续的斜杠。

【讨论】:

以上是关于Airflow GCS 到 BQuery 操作员无法识别云存储桶 URI的主要内容,如果未能解决你的问题,请参考以下文章

如何将 BigQuery 数据导出到 GCS?

Airflow DAG - 如何首先检查BQ(必要时删除)然后运行数据流作业?

远程气流Dags

Airflow - BigQuery 架构字段中的值无效

Apache Airflow GoogleCloudStorageToBigQueryOperator - time_partitioning 运算符

放入并行管道时未在 DAG 中显示的任务