Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据
Posted
技术标签:
【中文标题】Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据【英文标题】:Bigquery : Create table if not exist and load data using Python and Apache AirFlow 【发布时间】:2019-08-16 09:34:01 【问题描述】:首先,我使用 mysql 查询从生产数据库中获取所有数据,然后将该数据作为 NEW LINE DELIMITED JSON
存储在谷歌云存储中,我想要做的是:
1。检查表是否存在
2。如果表不存在,则使用自动检测模式创建表
3。存储数据
所有这些都将安排在气流中。真正让我困惑的是数字2
,我怎样才能在 Python 中做到这一点?还是 Airflow 可以自动执行此操作?
【问题讨论】:
【参考方案1】:Airflow 可以自动执行此操作。 create_disposition
参数在需要时创建表。 autodetect
参数正是您所需要的。这适用于 Airflow 1.10.2。
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
autodetect=True, # This uses autodetect
dag=dag
)
【讨论】:
我也有同样的疑问,但这是针对 BigQueryCreateEmptyTableOperator 的。当我们使用运算符创建表时。我们需要在参数中指定 table_id='Emp_7 是表名。我的问题是如果表已经存在所以它不应该创建一个表,如果表不存在那么只应该创建它。我们怎么能做到这一点??【参考方案2】:在 BigQuery 命令行中,如果您的 json 文件位于 GCS 上,则 Loading JSON data with schema auto-detection 在一个命令中为您执行 2 + 3。
查看 AirFlow 文档,GoogleCloudStorageToBigQueryOperator 似乎在做同样的事情。我检查了它的source,它只是调用 BigQuery load api。我相信它会做你想做的。
当不清楚每个参数的含义时,您可以使用参数名称搜索BigQuery Jobs api。
例如,要在您的任务列表中实现 1,您只需指定:
write_disposition (string) – 如果表已存在,则写入配置。
但为了知道您需要作为 write_disposition 传递的字符串,您必须在 BigQuery 文档中进行搜索。
【讨论】:
我也有同样的疑问,但这是针对 BigQueryCreateEmptyTableOperator 的。当我们使用运算符创建表时。我们需要在参数中指定 table_id='Emp_7 是表名。我的问题是如果表已经存在所以它不应该创建一个表,如果表不存在那么只应该创建它。我们怎么能做到这一点?? 它被称为 CreateDisposition :cloud.google.com/bigquery/docs/reference/auditlogs/rest/…以上是关于Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据的主要内容,如果未能解决你的问题,请参考以下文章
如果我在流式传输之前先删除表并创建表,Google BigQuery Streaming 有时会失败
如何在 google bigquery 数据集中创建动态表并在 tableau 中访问?
如何使用 ruby api 创建一个 bigquery 表并从云存储导入