Airflow GCS 到 BQuery 操作员无法识别云存储桶 URI
Posted
技术标签:
【中文标题】Airflow GCS 到 BQuery 操作员无法识别云存储桶 URI【英文标题】:Airflow GCS to BQuery operator not able to recognize the Cloud storage bucket URI 【发布时间】:2018-03-12 05:46:45 【问题描述】:我有以下气流代码,它基本上将表从 mysql DB 读取到 Google Cloud Storage,然后到 Google Big Query。
我已在气流的管理选项卡中包含连接详细信息以及 MySQL 和 GCP 的服务帐户。
extract = MySqlToGoogleCloudStorageOperator(
task_id='extract_actors',
mysql_conn_id='factset_test',
google_cloud_storage_conn_id='gcp_test',
sql='SELECT * FROM mysql.time_zone',
bucket='airflow_1',
filename='actors/actors.json',
schema_filename='schemas/actors.json',
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bq_time_zone",
bigquery_conn_id='gcp_test',
google_cloud_storage_conn_id='gcp_test',
bucket='airflow_1',
destination_project_dataset_table="airflow.mysql_time_zone",
source_objects='actors/actors0.json',
schema_object='schemas/actors.json',
source_format='NEWLINE_DELIMITED_JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag)
我看到 MySql 数据库中的表 time_zone 被复制到云存储桶气流_1。但是当气流试图将数据从云存储复制到 BigQuery 时,它抱怨它无法找到云存储桶。 以下是日志详细信息:
[2018-03-12 02:16:59,031] models.py:167 INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:16:59,601] base_task_runner.py:112 INFO - Running: ['bash', '-c', u'airflow run mysql_to_gcs load_bq_time_zone 2018-03-12T02:16:48.974591 --job_id 465 --raw -sd DAGS_FOLDER/mysql_to_gcs.py']
[2018-03-12 02:16:59,822] base_task_runner.py:95 INFO - Subtask: /usr/local/lib/python2.7/dist-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-03-12 02:16:59,822] base_task_runner.py:95 INFO - Subtask: """)
[2018-03-12 02:16:59,858] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:16:59,857] __init__.py:57 INFO - Using executor SequentialExecutor
[2018-03-12 02:16:59,949] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:16:59,949] driver.py:120 INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-03-12 02:16:59,973] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:16:59,973] driver.py:120 INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-03-12 02:17:00,157] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,157] models.py:167 INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:17:00,712] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,712] models.py:1126 INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,720] models.py:1126 INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,721] models.py:1318 INFO -
[2018-03-12 02:17:00,721] base_task_runner.py:95 INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] base_task_runner.py:95 INFO - Subtask: Starting attempt 1 of 2
[2018-03-12 02:17:00,722] base_task_runner.py:95 INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] base_task_runner.py:95 INFO - Subtask:
[2018-03-12 02:17:00,738] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,737] models.py:1342 INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): load_bq_time_zone> on 2018-03-12 02:16:48.974591
[2018-03-12 02:17:00,792] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,792] gcp_api_base_hook.py:81 INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:00,795] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,795] __init__.py:44 WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,795] base_task_runner.py:95 INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:00,795] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: from . import file_cache
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: 'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,795] discovery.py:274 INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2018-03-12 02:17:00,796] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,795] transport.py:157 INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:00,838] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,838] client.py:777 INFO - Refreshing access_token
[2018-03-12 02:17:00,910] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:00,909] discovery.py:868 INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/airflow_1/o/%2Fschemas%2Factors.json?alt=media
[2018-03-12 02:17:01,209] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,208] gcp_api_base_hook.py:81 INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:01,210] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,210] __init__.py:44 WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,210] base_task_runner.py:95 INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: from . import file_cache
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: 'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,211] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,210] discovery.py:274 INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2018-03-12 02:17:01,212] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,210] transport.py:157 INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:01,248] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,248] client.py:777 INFO - Refreshing access_token
[2018-03-12 02:17:01,325] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,325] bigquery_hook.py:961 INFO - project not included in destination_project_dataset_table: airflow.mysql_time_zone; using project "bigquery-1210"
[2018-03-12 02:17:01,339] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,339] discovery.py:868 INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs?alt=json
[2018-03-12 02:17:01,888] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:01,887] discovery.py:868 INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L?alt=json
[2018-03-12 02:17:02,064] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:02,062] models.py:1417 ERROR - BigQuery job failed. Final error was: u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'. The job was: u'status': u'state': u'DONE', u'errors': [u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'], u'errorResult': u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t', u'kind': u'bigquery#job', u'statistics': u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658', u'jobReference': u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': u'load': u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow', u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': u'fields': [u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED', u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'], u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask: result = task_copy.execute(context=context)
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/gcs_to_bq.py", line 153, in execute
[2018-03-12 02:17:02,065] base_task_runner.py:95 INFO - Subtask: schema_update_options=self.schema_update_options)
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 476, in run_load
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask: return self.run_with_configuration(configuration)
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 513, in run_with_configuration
[2018-03-12 02:17:02,066] base_task_runner.py:95 INFO - Subtask: job['status']['errorResult'], job
[2018-03-12 02:17:02,069] base_task_runner.py:95 INFO - Subtask: Exception: BigQuery job failed. Final error was: u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'. The job was: u'status': u'state': u'DONE', u'errors': [u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'], u'errorResult': u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t', u'kind': u'bigquery#job', u'statistics': u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658', u'jobReference': u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': u'load': u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow', u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': u'fields': [u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED', u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'], u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'
[2018-03-12 02:17:02,069] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:02,068] models.py:1433 INFO - Marking task as UP_FOR_RETRY
[2018-03-12 02:17:02,087] base_task_runner.py:95 INFO - Subtask: [2018-03-12 02:17:02,085] models.py:1462 ERROR - BigQuery job failed. Final error was: u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'. The job was: u'status': u'state': u'DONE', u'errors': [u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'], u'errorResult': u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t', u'kind': u'bigquery#job', u'statistics': u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658', u'jobReference': u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': u'load': u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow', u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': u'fields': [u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED', u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'], u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'
【问题讨论】:
我在日志中找不到存储桶的问题。不过,我可以看到某种身份验证错误,在这里更具体:***.com/questions/40154672/… 如果您看到日志的最后一行。错误 - BigQuery 作业失败。最终错误是:u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'。 另外,当 Airflow 在将 MySql 表写入云存储时没有抱怨身份验证错误时,我不确定它为什么在将数据从云存储读取到 Bigquery 时抱怨。 【参考方案1】:问题是source_objects='actors/actors0.json'
需要source_objects=['actors/actors0.json']
来自文档:
:param source_objects: List of Google cloud storage URIs to load from. (templated)
If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
:type object: list
这是代码:
source_uris = ['gs:///'.format(self.bucket, source_object)
for source_object in self.source_objects]
所以它会遍历它。
【讨论】:
true,source_objects 是列表的类型,这解决了我的问题,我被困了几个小时。 source_objects=['my_file.csv']【参考方案2】:source_objects 需要提供一个列表,即source_objects=['actors/actors0.json']
,因为一次可以加载多个文件。
始终最好检查运算符代码中的字段定义,例如 https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/contrib/operators/gcs_to_bq.py#L65
【讨论】:
【参考方案3】:查看报错信息,问题是你指定的bucket:airflow_1
不存在。其实我在我的项目中尝试过用这个名字创建一个bucket,这个名字是可用的(一个bucket名字must be globally unique across Cloud Storage)。
另外,请注意 BigQuery does not support source URIs 在初始双斜杠之后包含多个连续的斜杠。
【讨论】:
以上是关于Airflow GCS 到 BQuery 操作员无法识别云存储桶 URI的主要内容,如果未能解决你的问题,请参考以下文章
Airflow DAG - 如何首先检查BQ(必要时删除)然后运行数据流作业?
Apache Airflow GoogleCloudStorageToBigQueryOperator - time_partitioning 运算符