Airflow 中是不是有操作员可以根据 BigQuery 中的查询创建表?

Posted

技术标签:

【中文标题】Airflow 中是不是有操作员可以根据 BigQuery 中的查询创建表?【英文标题】:Is there an operator in Airflow to create a table from a query in BigQuery?Airflow 中是否有操作员可以根据 BigQuery 中的查询创建表? 【发布时间】:2021-07-29 11:49:10 【问题描述】:

我正在寻找类似的东西

CreateBQTableOperator(
    query='select * from my_table',
    output_table='my_other_table'
)

我正在寻找已经存在的运算符或此类运算符的代码。运算符应采用另一个参数来决定是在重新创建表之前删除表(如果表存在)还是将查询附加到当前表。

【问题讨论】:

【参考方案1】:

您可以将BigQueryExecuteQueryOperatordestination_dataset_table 参数用作:

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

execute_query_save = BigQueryExecuteQueryOperator(
    task_id="execute_query_save",
    sql="SELECT * FROM my_data_set.table1",
    use_legacy_sql=False,
    destination_dataset_table="my_data_set.table2",
    location="southamerica-east1",
    write_disposition="WRITE_EMPTY",
    create_disposition="CREATE_IF_NEEDED",
)

您可以通过设置参数值来控制请求的行为(参考来自Google docs 的值)。

write_disposition 选项是:

WRITE_TRUNCATE:如果表已存在,BigQuery 会覆盖表数据并使用查询结果中的架构。

WRITE_APPEND:如果表已存在,BigQuery 会将数据附加到表中。

WRITE_EMPTY:如果表已存在且包含数据,则作业结果中返回“重复”错误。

create_disposition 选项是:

CREATE_IF_NEEDED:如果表不存在,BigQuery 会创建表。

CREATE_NEVER:表必须已经存在。如果没有,则在作业结果中返回“notFound”错误。

【讨论】:

以上是关于Airflow 中是不是有操作员可以根据 BigQuery 中的查询创建表?的主要内容,如果未能解决你的问题,请参考以下文章

大数据调度平台Airflow:Airflow WebUI操作介绍

通过 UI 将参数传递给 Airflow 的作业

BigQueryCheckOperator 在 Cloud Composer 中失败,出现 404 错误

(Django)气流中的 ORM - 有可能吗?

airflow并发慢

Airflow SSH 操作员错误:遇到 RSA 密钥,应为 OPENSSH 密钥