使用 Airflow 从 Google Storage 导出到 BQ 时出错
Posted
技术标签:
【中文标题】使用 Airflow 从 Google Storage 导出到 BQ 时出错【英文标题】:Error while exporting from Google Storage to BQ with Airflow 【发布时间】:2018-04-20 16:24:35 【问题描述】:我正在尝试从 Google Cloud Storage 导出文件并将其加载到 BigQuery。 这样做时我收到以下错误:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 285, in process_file
m = imp.load_source(mod_name, filepath)
File "/root/airflow/dags/mysql_bi_invoices.py", line 8, in <module>
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperatorfrom
ImportError: cannot import name MySqlToGoogleCloudStorageOperatorfrom
Done.
[2018-04-20 15:21:30,773] __init__.py:45 INFO - Using executor SequentialExecutor
[2018-04-20 15:21:30,858] models.py:189 INFO - Filling up the DagBag from /root/airflow/dags
[2018-04-20 15:21:31,333] models.py:288 ERROR - Failed to import: /root/airflow/dags/mysql_bi_invoices.py
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 285, in process_file
m = imp.load_source(mod_name, filepath)
File "/root/airflow/dags/mysql_bi_invoices.py", line 8, in <module>
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperatorfrom
ImportError: cannot import name MySqlToGoogleCloudStorageOperatorfrom
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 516, in test
dag = dag or get_dag(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 130, in get_dag
'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: test. Either the dag did not exist or it failed to parse.
我的 DAG 如下所示:
import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperatorfrom
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
default_args =
'owner': 'test',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email': ['test@test'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
with DAG('test',
schedule_interval='04 04 * * *',
default_args=default_args) as dag:
load_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id='test_to_bq',
bucket='test',
source_objects = 'gs://test/test_to_bq_folder',
schema_object = 'test/file_to_extract.json',
destination_project_dataset_table='test.final_table',
source_format='JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id='google_cloud',
bigquery_conn_id='google_cloud',
dag = dag
)
我尝试添加/更改 DAG 的参数,但没有成功。任何见解都会有所帮助
【问题讨论】:
【参考方案1】:此错误与GBQ无关,见错误信息:
airflow.exceptions.AirflowException: dag_id could not be found: test. Either the dag did not exist or it failed to parse.
先检查是否可以列出 DAG
airflow list_dags
如果这不起作用,则您的 DAG 中有错误。此外,输出中已经存在错误原因:
ImportError: cannot import name MySqlToGoogleCloudStorageOperatorfrom
好像是笔误,应该是
MySqlToGoogleCloudStorageOperator
在您的导入中。
【讨论】:
以上是关于使用 Airflow 从 Google Storage 导出到 BQ 时出错的主要内容,如果未能解决你的问题,请参考以下文章
Apache Airflow - 添加 Google 身份验证
Airflow 的 bigqueryoperator 不能与 udf 一起使用
DAG 在 Google Cloud Composer 网络服务器上不可点击,但在本地 Airflow 上运行良好
Google Cloud Composer (Apache Airflow) 无法访问日志文件