气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators

Posted

技术标签:

【中文标题】气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators【英文标题】:Airflow to interact with BigQuery outside of the DAG not using BigQueryOperators 【发布时间】:2019-12-13 19:26:20 【问题描述】:

我正在努力寻找解决 Airflow 上以下问题的方法:

我在 BQ 上有一张包含产品列表的表格(定期增加)。每个产品在 BigQuery/GoogleCloud 上都有不同的项目。比方说:

产品 |身份证 |项目 ID | PARAM_1 | PARAM_2

我目前在 Jenkins 上的管道使用 for 循环为每个产品构建并行 DAG,并且运行良好。

当我翻译为 Airflow DAG 时,我能够实现以下目标:

...
product_params = 
    'Product1': 
        'project_id': 'product-1',
        'color': 'Blue'
    ,
    'Product2': 
        'project_id': 'product-2',
        'color': 'Red'
    ,...



my_dag = DAG(
    'My_Default_DAG',
    schedule_interval='@once',
    default_args=default_args
    )

dag_tasks = 

with firebase_dag as dag:
    for product_name, p_params in product_params.items():
        query_params = 
            'product_name': product_name,
            'product_project': product_params['project_id'],
            'color': product_params['color'],
            'event_date': '2019-12-01',
            'event_date_suffix': '20191201'
        

        dag_tasks[game] = 

        dag_tasks[game]['step_1'] = BigQueryOperator(
                task_id="0_step_1".format(product_name),
                bql='sql_folder/step-1.sql',
                use_legacy_sql=False,
                destination_dataset_table="0.prod_dataset.step1Table_1".format(product_params['project_id'], query_params['event_date_suffix']),
                write_disposition='WRITE_TRUNCATE',
                params=query_params
            )
       ### following steps...

理想情况下,我想直接在 BigQuery 上查询我的产品参数。而且我已经在 bitbucket 上为它开发了一个 Python 库,还有一堆 Jenkins 广泛使用的其他方法。

有什么方法可以将该库导入到气流中并在我的 dags 中使用它?

否则,除了 BigQueryOperators 之外,我还有其他方法可以构建与 bigquery 交互的方法吗?

【问题讨论】:

【参考方案1】:

是的,您可以在 DAG 中使用您的库并将其与 PythonOperator 一起使用。

【讨论】:

导入我自己的库的最佳方式是什么?我正在使用 Cloud Composer。【参考方案2】:

您可以通过private repository 或local file 在 Composer 中安装自己的 Python 库。

建议您在部署到生产环境之前test your PyPI packages locally in an Airflow worker container,因为自定义包可能会导致与 Airflow 所需的依赖项发生冲突。

【讨论】:

很好@Javier Bóbeda。我会调查的。谢谢 嗨@Roghen!它对你有用吗?如果是,请将其标记为正确的解决方案,这样我们将来可以帮助社区中的人们。

以上是关于气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators的主要内容,如果未能解决你的问题,请参考以下文章

优化将BigQuery的数据传输到MongoDB的气流任务

运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶

如何在另一个任务气流中使用查询结果(bigquery 运算符)

如何与客户运营商验证气流 DAG?

每月日期和时间的气流 DAG 调度

气流 - 试图循环操作员。执行不是等待实际操作完成