引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败
Posted
技术标签:
【中文标题】引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败【英文标题】:Google GCS to BIGQUERY fails when bringing in whole rows of data (via airflow) 【发布时间】:2020-05-14 23:38:11 【问题描述】:我使用 GoogleCloudStorageToBigQueryOperator 的目的是将一系列类似结构的文件带入大查询中。
因为我知道由于摄取的限制,需要在 bigquery 中解析文件,所以我想将每一行数据作为单个数据元素(一个字段)引入。以下是我的代码:
GCS_to_GBQ_Raw = GoogleCloudStorageToBigQueryOperator(
task_id='GCS_to_GBQ_Raw',
bucket='files',
source_objects=['To_Process/*.csv'],
source_format='CSV',
destination_project_dataset_table='DS.RAW',
schema_fields=[
'name': 'datarow', 'type': 'STRING', 'mode': 'NULLABLE'
],
field_delimiter='\t',
autodetect=False,
skip_leading_rows=1,
write_disposition='WRITE_TRUNCATE',
quote_character='µ',
google_cloud_storage_conn_id='GCP_Staging',
bigquery_conn_id='GCP_Staging',
dag=dag)
我尝试了以下方法:
-
field_delimiter - 我尝试过模糊的 ascii 字符,例如节字符、管道和制表符。我知道文件中的数据没有这些。
autodetect=真假。
quote_character=''、'""'、当前字符 mu,以及我知道的其他字符不在文件中。
我每次运行时都会收到一组随机错误:
列太多 引用和字段结尾之间的数据 正在尝试追加我该如何进行这项工作?如何引入完整的行,以便在 bq 中解析它们?
谢谢!
【问题讨论】:
那么,您的 BQ 表将只有一列,该列将包含文件中一行中的所有数据? 你能提供一个输入数据的例子吗? 【参考方案1】:我不确定您的文件看起来如何,如果您可以提供一些示例记录,那么我们可以提供更好的帮助。 如果我正确理解了您的问题,那么下面的代码应该可以工作:
import datetime as dt
from airflow.models import DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
# from airflow.utils.dates import days_ago
lasthour = dt.datetime.utcnow() - dt.timedelta(hours=1)
args =
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
'job_name': 'employee-test',
dag = DAG(
dag_id='just-check',
schedule_interval=None,
default_args=args
)
load_csv = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket='testing-bucket',
source_objects=['employee/*.csv'],
field_delimiter='|',
skip_leading_rows=1,
autodetect=False,
destination_project_dataset_table='project_id.raw.gcs_to_bq_table',
schema_fields=[
'name': 'datarow', 'type': 'STRING', 'mode': 'NULLABLE'
],
write_disposition='WRITE_TRUNCATE',
dag=dag)
运行此 DAG 后,我的 BQ 表的每一行数据都有一列。
【讨论】:
确实做到了。该文件正在推送 field_delimiter 检测到的随机字符....我必须搜索它才能找到它们。非常感谢以上是关于引入整行数据(通过气流)时,Google GCS 到 BIGQUERY 失败的主要内容,如果未能解决你的问题,请参考以下文章
尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错
运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶
气流导出模式仅从 PostgreSQL 到 bigquery