如何自动化 BigQuery SQL 管道

Posted

技术标签:

【中文标题】如何自动化 BigQuery SQL 管道【英文标题】:How to automate a BigQuery SQL pipeline 【发布时间】:2020-04-21 13:59:06 【问题描述】:

我使用 BigQuery SQL 创建了一个数据管道。 首先从 Cloud Storage 导入 CSV 文件,然后进行不同的分析,包括使用 BigQueryML 进行预测建模 使用地理函数进行地理计算,以及 使用分析函数计算 KPI。

我能够成功地手动运行不同的查询,现在我想自动化数据管道。

我的第一选择是 DataFlow SQL,但事实证明 Dataflow SQL 查询语法不支持地理函数。

DataFlow python 不是一个选项,因为完整的分析是在 SQL 中完成的,我想保持这种方式。

我的问题是可用于自动化数据管道的其他 GCP 选项是什么。

【问题讨论】:

您需要多复杂?你能简单地使用预定查询吗?如果你需要更复杂的我会使用 python,即使你所有的步骤都是函数式 SQL,你也可以使用 python 来编排它们。 就我个人而言,我发现 Dataflow 复杂且过于繁重,因为我需要运行多个 SQL 查询。 CRMint 是自动化 SQL(和其他)作业的好工具,也许这对您来说是一个很好的中间立场? github.com/google/crmint cloud.getdbt.com 这是我们目前使用的一个很好的解决方案。您基本上用几乎纯 SQL 编排数据管道。 查看 Magnus - Workflow Automator,它是 Potens.io Suite 的一部分 - 支持所有 BigQuery、Cloud Storage 和大多数 Google API 以及多个简单的实用程序类型任务,例如 BigQuery 任务、导出到存储任务、循环任务等等,以及高级调度、触发等。也可在Marketplace 获得。披露:我是这些工具的创造者和 Potens 团队的领导者 如果您的查询像管道一样工作,换句话说,如果您需要完成一个查询才能运行另一个查询,我建议您查看 Cloud Composer 【参考方案1】:

正如我在评论中提到的,如果您需要编排查询,可以使用 Cloud Composer,一个完全托管的 Airflow 集群。

我创建了下面的代码,或多或少地向您展示如何使用此工具编排查询。请注意,这是一个基本代码,可以在编码标准方面进行改进。 该代码基本上编排了 3 个查询:

    第一个从公共表读取并写入项目中的另一个表 第二个读取在第一个查询中创建的表,并根据日期列选择 10000 条最新的行。之后,它会将结果保存到项目中的表中。

    第三个读取步骤 2 中创建的表并计算一些聚合。之后,它将结果保存到项目中的另一个表中。

    import datetime
    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    
    """The condiguration presented below will run your DAG every five minutes as specified in the 
    schedule_interval property starting from the datetime specified in the start_date property"""
    
    default_dag_args = 
        'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=1),
        'project_id': "<your_project_id>",
    
    
    with models.DAG(
            'composer_airflow_bigquery_orchestration',
            schedule_interval = "*/5 * * * *",
            default_args=default_dag_args) as dag:
    
        run_first_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
            task_id = 'xxxxxxxx',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_second_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
            task_id = 'yyyyyyyy',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_third_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
            task_id = 'zzzzzzzz',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
    
       # Define DAG dependencies.
        run_first_query >> run_second_query >> run_third_query
    

一步一步来:

首先,它导入了一些 Airflow 库,如模型和 bigquery_operator

from airflow import models
from airflow.contrib.operators import bigquery_operator

然后它定义了一个名为 default_dag_args 的字典,将在您创建 DAG 时进一步使用。

default_dag_args = 
    'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1),
    'project_id': "<your_project_id>",

创建 DAG 时,将 default_dag_args dict 作为默认参数传递,并添加将定义 DAG 何时运行的 schedule interval 参数。您可以将此参数与一些预设表达式或使用 CRON 表达式一起使用,如您所见here

with models.DAG(
        'composer_airflow_bigquery_orchestration',
        schedule_interval = "*/5 * * * *",
        default_args=default_dag_args) as dag:

之后,您可以创建运营商的实例。在这种情况下,我们只使用BigQueryOperator

    run_first_query = bigquery_operator.BigQueryOperator(
        sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
        destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
        task_id = 'xxxxxxxx',
        write_disposition = "WRITE_TRUNCATE",
        #create_disposition = "",
        allow_large_results = True,
        use_legacy_sql = False
    )

    run_second_query = bigquery_operator.BigQueryOperator(
        sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
        destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
        task_id = 'yyyyyyyy',
        write_disposition = "WRITE_TRUNCATE",
        #create_disposition = "",
        allow_large_results = True,
        use_legacy_sql = False
    )

    run_third_query = bigquery_operator.BigQueryOperator(
        sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
        destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
        task_id = 'zzzzzzzz',
        write_disposition = "WRITE_TRUNCATE",
        #create_disposition = "",
        allow_large_results = True,
        use_legacy_sql = False
    )

作为最后一步,我们可以定义 DAG 的依赖关系。这段代码的意思是run_second_query的操作依赖于run_first_query的结论,这样就可以了。

    run_first_query >> run_second_query >> run_third_query

最后,我想添加这个article,讨论如何在使用 CRON 表达式时正确设置 start_date 和 schedule_interval。

【讨论】:

谢谢你,你的例子很有帮助。【参考方案2】:

BigQuery 有一个内置的调度机制,目前处于测试阶段。

要自动化 BQ 本机 SQL 管道,您可以使用此实用程序。 使用 CLI:

$ bq query \
--use_legacy_sql=false \
--destination_table=mydataset.mytable \
--display_name='My Scheduled Query' \
--replace=true \
'SELECT
1
FROM
mydataset.test'

【讨论】:

以上是关于如何自动化 BigQuery SQL 管道的主要内容,如果未能解决你的问题,请参考以下文章

BigQuery 与 Cloud SQL 自动扩缩?

BigQuery 自动化

用于 Linux 操作系统的 azure devops 发布管道中的自动化 SQL 部署任务

BigQuery SQL 通过应用程序脚本按 DATE 过滤

BigQuery 中的相关计划 SQL 查询

自动打开命名管道和 tcp\ip