引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败

Posted

技术标签:

【中文标题】引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败【英文标题】:Google GCS to BIGQUERY fails when bringing in whole rows of data (via airflow) 【发布时间】:2020-05-14 23:38:11 【问题描述】:

我使用 GoogleCloudStorageToBigQueryOperator 的目的是将一系列类似结构的文件带入大查询中。

因为我知道由于摄取的限制,需要在 bigquery 中解析文件,所以我想将每一行数据作为单个数据元素(一个字段)引入。以下是我的代码:

GCS_to_GBQ_Raw = GoogleCloudStorageToBigQueryOperator(
    task_id='GCS_to_GBQ_Raw',
    bucket='files',
    source_objects=['To_Process/*.csv'],
    source_format='CSV',
    destination_project_dataset_table='DS.RAW',
    schema_fields=[
       'name': 'datarow', 'type': 'STRING', 'mode': 'NULLABLE'
    ],
    field_delimiter='\t',
    autodetect=False,
    skip_leading_rows=1,
    write_disposition='WRITE_TRUNCATE',
    quote_character='µ',
    google_cloud_storage_conn_id='GCP_Staging',
    bigquery_conn_id='GCP_Staging',
    dag=dag)

我尝试了以下方法:

    field_delimiter - 我尝试过模糊的 ascii 字符,例如节字符、管道和制表符。我知道文件中的数据没有这些。 autodetect=真假。 quote_character=''、'""'、当前字符 mu,以及我知道的其他字符不在文件中。

我每次运行时都会收到一组随机错误:

列太多 引用和字段结尾之间的数据 正在尝试追加

我该如何进行这项工作?如何引入完整的行,以便在 bq 中解析它们?

谢谢!

【问题讨论】:

那么,您的 BQ 表将只有一列,该列将包含文件中一行中的所有数据? 你能提供一个输入数据的例子吗? 【参考方案1】:

我不确定您的文件看起来如何,如果您可以提供一些示例记录,那么我们可以提供更好的帮助。 如果我正确理解了您的问题,那么下面的代码应该可以工作:

import datetime as dt
from airflow.models import DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# from airflow.utils.dates import days_ago
lasthour = dt.datetime.utcnow() - dt.timedelta(hours=1)

args = 
 'owner': 'airflow',
 'start_date': lasthour,
 'depends_on_past': False,
 'job_name': 'employee-test',

dag = DAG(
 dag_id='just-check',
 schedule_interval=None,
 default_args=args
)

load_csv = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq_example',
    bucket='testing-bucket',
    source_objects=['employee/*.csv'],
    field_delimiter='|',
    skip_leading_rows=1,
    autodetect=False,
    destination_project_dataset_table='project_id.raw.gcs_to_bq_table',
    schema_fields=[
       'name': 'datarow', 'type': 'STRING', 'mode': 'NULLABLE'
    ],
    write_disposition='WRITE_TRUNCATE',
    dag=dag)

运行此 DAG 后,我的 BQ 表的每一行数据都有一列。

【讨论】:

确实做到了。该文件正在推送 field_delimiter 检测到的随机字符....我必须搜索它才能找到它们。非常感谢

以上是关于引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败的主要内容,如果未能解决你的问题,请参考以下文章

尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错

运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶

气流导出模式仅从 PostgreSQL 到 bigquery

Airflow GCS 到 BQuery 操作员无法识别云存储桶 URI

将 XLS 文件从 GCS 导入 BigQuery

如何将 Google Analytics 数据导出到 Google GCS 存储桶或 BigQuery?