使用 Airflow 从 Google Storage 导出到 BQ 时出错

Posted

技术标签:

【中文标题】使用 Airflow 从 Google Storage 导出到 BQ 时出错【英文标题】:Error while exporting from Google Storage to BQ with Airflow 【发布时间】:2018-04-20 16:24:35 【问题描述】:

我正在尝试从 Google Cloud Storage 导出文件并将其加载到 BigQuery。 这样做时我收到以下错误:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 285, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/root/airflow/dags/mysql_bi_invoices.py", line 8, in <module>
    from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperatorfrom
ImportError: cannot import name MySqlToGoogleCloudStorageOperatorfrom
Done.
[2018-04-20 15:21:30,773] __init__.py:45 INFO - Using executor SequentialExecutor
[2018-04-20 15:21:30,858] models.py:189 INFO - Filling up the DagBag from /root/airflow/dags
[2018-04-20 15:21:31,333] models.py:288 ERROR - Failed to import: /root/airflow/dags/mysql_bi_invoices.py
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 285, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/root/airflow/dags/mysql_bi_invoices.py", line 8, in <module>
    from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperatorfrom
ImportError: cannot import name MySqlToGoogleCloudStorageOperatorfrom
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 516, in test
    dag = dag or get_dag(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 130, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: test. Either the dag did not exist or it failed to parse.

我的 DAG 如下所示:

import airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperatorfrom
from airflow.contrib.hooks.bigquery_hook import BigQueryHook


default_args = 
    'owner': 'test',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'email': ['test@test'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),    

with DAG('test',
        schedule_interval='04 04 * * *',
        default_args=default_args) as dag:

    load_to_bq = GoogleCloudStorageToBigQueryOperator(
        task_id='test_to_bq',
        bucket='test',
        source_objects = 'gs://test/test_to_bq_folder',
        schema_object = 'test/file_to_extract.json',
        destination_project_dataset_table='test.final_table',
        source_format='JSON',
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        google_cloud_storage_conn_id='google_cloud',
        bigquery_conn_id='google_cloud',
        dag = dag
    )

我尝试添加/更改 DAG 的参数,但没有成功。任何见解都会有所帮助

【问题讨论】:

【参考方案1】:

此错误与GBQ无关,见错误信息:

airflow.exceptions.AirflowException: dag_id could not be found: test. Either the dag did not exist or it failed to parse.

先检查是否可以列出 DAG

airflow list_dags

如果这不起作用,则您的 DAG 中有错误。此外,输出中已经存在错误原因:

ImportError: cannot import name MySqlToGoogleCloudStorageOperatorfrom

好像是笔误,应该是

MySqlToGoogleCloudStorageOperator

在您的导入中。

【讨论】:

以上是关于使用 Airflow 从 Google Storage 导出到 BQ 时出错的主要内容,如果未能解决你的问题,请参考以下文章

Apache Airflow - 添加 Google 身份验证

Airflow 的 bigqueryoperator 不能与 udf 一起使用

DAG 在 Google Cloud Composer 网络服务器上不可点击,但在本地 Airflow 上运行良好

Google Cloud Composer (Apache Airflow) 无法访问日志文件

如何设置 Airflow DAG 权限以查询基于 Google Sheets 文档构建的 BigQuery 表?

Airflow 中文文档:使用操作器