气流:无法使用 Bigquery 挂钩创建视图

Posted

技术标签:

【中文标题】气流:无法使用 Bigquery 挂钩创建视图【英文标题】:Airflow: Unable to create view using Bigquery hooks 【发布时间】:2020-10-01 04:36:17 【问题描述】:

我正在尝试使用 bigquery 钩子在气流中创建 bigquery 视图,并给我以下错误。请在下面找到代码sn-p:

BigQueryHook().create_empty_table(
                        dataset_id="data_loader_view",
                        table_id="customerdata",
                        view=
                            "query": "SELECT * FROM data_loader.customerdata",
                            "useLegacySql": False
                            
                        )

回溯错误:

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execut
    return_value = self.execute_callable(
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callabl
    return self.python_callable(*self.op_args, **self.op_kwargs
  File "/home/airflow/gcs/dags/bigquery_view_dag.py", line 37, in create_bq_vie
    view_util.create_bq_view_util(context['final_dict']
  File "/home/airflow/gcs/dags/utils/view.py", line 29, in create_bq_view_uti
    "useLegacySql": Fals
  File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 356, in inner_wrappe
    return func(self, *args, **kwargs
  File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 339, in create_empty_tabl
    retry=retr
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 544, in create_tabl
    retry, method="POST", path=path, data=data, timeout=timeou
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 556, in _call_ap
    return call(
  File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_fun
    on_error=on_error
  File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 184, in retry_targe
    return target(
  File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/_http.py", line 423, in api_reques
    raise exceptions.from_http_response(response
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<project_name>/datasets/data_loader_view/tables: Invalid JSON payload received. Unknown name "query" at 'table.view': Proto field is not repeating, cannot start list
[2020-10-01 04:25:30,857] base_task_runner.py:115 INFO - Job 53554: Subtask create_bq_view [2020-10-01 04:25:30,612] taskinstance.py:1059 ERROR - 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<project_name>/datasets/data_loader_view/tables: Invalid JSON payload received. Unknown name "query" at 'table.view': Proto field is not repeating, cannot start list.

参考文档: https://airflow.apache.org/docs/stable/_api/airflow/contrib/hooks/bigquery_hook/index.html#airflow.contrib.hooks.bigquery_hook.BigQueryBaseCursor.create_empty_table

请帮我解决这个问题

【问题讨论】:

【参考方案1】:

尝试使用下面的代码作为示例:

#!/usr/bin/python
# -*- coding: utf-8 -*-
from airflow.providers.google.cloud.hooks import bigquery
from airflow.operators import python_operator
from airflow import models
import datetime

#Creating function that calls the view creation
def create_view(ds, **kwargs):
    bigquery.BigQueryHook().create_empty_table(dataset_id='<my-dataset>',
            table_id='customerdata',
            view='query': 'SELECT * FROM `<my-dataset>.<my-source-table>`',
            'useLegacySql': False)


default_dag_args = 
    'start_date': datetime.datetime(2020, 10, 2, 11, 5),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1),
    'project_id': '<my-project-id>',
    

with models.DAG('bigquery_airflow', schedule_interval='*/5 * * * *',
                default_args=default_dag_args) as dag:
    create_view = python_operator.PythonOperator(task_id='succeeded',
            provide_context=True, python_callable=create_view)

    create_view

【讨论】:

【参考方案2】:

您是否尝试在代码中添加实际项目名称?

google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/**<project_name>**/datasets/data_loader_view/tables

此外,根据您提供的文档,这部分包含项目 ID,而您的代码仅包含两个组件 (data_loader.customerdata)

view = 
"query": "SELECT * FROM `**test-project-id**.test_dataset_id.test_table_prefix*` LIMIT 1000",
"useLegacySql": False

【讨论】:

我已经在查询和代码中添加了项目ID。我不工作。

以上是关于气流:无法使用 Bigquery 挂钩创建视图的主要内容,如果未能解决你的问题,请参考以下文章

BigQuery 作业状态已完成,但未使用气流插入任何行

气流导出模式仅从 PostgreSQL 到 bigquery

如何从气流传感器中提取 xcom 值?

运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶

无法创建视图 - BigQuery

BigQuery 无法创建视图,但查询正常