Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据

Posted

技术标签:

【中文标题】Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据【英文标题】:Bigquery : Create table if not exist and load data using Python and Apache AirFlow 【发布时间】:2019-08-16 09:34:01 【问题描述】:

首先,我使用 mysql 查询从生产数据库中获取所有数据,然后将该数据作为 NEW LINE DELIMITED JSON 存储在谷歌云存储中,我想要做的是: 1。检查表是否存在 2。如果表不存在,则使用自动检测模式创建表 3。存储数据

所有这些都将安排在气流中。真正让我困惑的是数字2,我怎样才能在 Python 中做到这一点?还是 Airflow 可以自动执行此操作?

【问题讨论】:

【参考方案1】:

Airflow 可以自动执行此操作。 create_disposition 参数在需要时创建表。 autodetect 参数正是您所需要的。这适用于 Airflow 1.10.2

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    autodetect=True, # This uses autodetect
    dag=dag
)

【讨论】:

我也有同样的疑问,但这是针对 BigQueryCreateEmptyTableOperator 的。当我们使用运算符创建表时。我们需要在参数中指定 table_id='Emp_7 是表名。我的问题是如果表已经存在所以它不应该创建一个表,如果表不存在那么只应该创建它。我们怎么能做到这一点??【参考方案2】:

在 BigQuery 命令行中,如果您的 json 文件位于 GCS 上,则 Loading JSON data with schema auto-detection 在一个命令中为您执行 2 + 3。

查看 AirFlow 文档,GoogleCloudStorageToBigQueryOperator 似乎在做同样的事情。我检查了它的source,它只是调用 BigQuery load api。我相信它会做你想做的。

当不清楚每个参数的含义时,您可以使用参数名称搜索BigQuery Jobs api。

例如,要在您的任务列表中实现 1,您只需指定:

write_disposition (string) – 如果表已存在,则写入配置。

但为了知道您需要作为 write_disposition 传递的字符串,您必须在 BigQuery 文档中进行搜索。

【讨论】:

我也有同样的疑问,但这是针对 BigQueryCreateEmptyTableOperator 的。当我们使用运算符创建表时。我们需要在参数中指定 table_id='Emp_7 是表名。我的问题是如果表已经存在所以它不应该创建一个表,如果表不存在那么只应该创建它。我们怎么能做到这一点?? 它被称为 CreateDisposition :cloud.google.com/bigquery/docs/reference/auditlogs/rest/…

以上是关于Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据的主要内容,如果未能解决你的问题,请参考以下文章

如果我在流式传输之前先删除表并创建表,Google BigQuery Streaming 有时会失败

如何在 google bigquery 数据集中创建动态表并在 tableau 中访问?

如何使用 ruby​​ api 创建一个 bigquery 表并从云存储导入

MySQL 仅在表不存在时插入数据

您可以 SQL 填充 BigQuery 表并在同一个 API 调用中设置表列模式吗?

比较 2 个表并返回 MySQL 中的变化