气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators
Posted
技术标签:
【中文标题】气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators【英文标题】:Airflow to interact with BigQuery outside of the DAG not using BigQueryOperators 【发布时间】:2019-12-13 19:26:20 【问题描述】:我正在努力寻找解决 Airflow 上以下问题的方法:
我在 BQ 上有一张包含产品列表的表格(定期增加)。每个产品在 BigQuery/GoogleCloud 上都有不同的项目。比方说:
产品 |身份证 |项目 ID | PARAM_1 | PARAM_2
我目前在 Jenkins 上的管道使用 for 循环为每个产品构建并行 DAG,并且运行良好。
当我翻译为 Airflow DAG 时,我能够实现以下目标:
...
product_params =
'Product1':
'project_id': 'product-1',
'color': 'Blue'
,
'Product2':
'project_id': 'product-2',
'color': 'Red'
,...
my_dag = DAG(
'My_Default_DAG',
schedule_interval='@once',
default_args=default_args
)
dag_tasks =
with firebase_dag as dag:
for product_name, p_params in product_params.items():
query_params =
'product_name': product_name,
'product_project': product_params['project_id'],
'color': product_params['color'],
'event_date': '2019-12-01',
'event_date_suffix': '20191201'
dag_tasks[game] =
dag_tasks[game]['step_1'] = BigQueryOperator(
task_id="0_step_1".format(product_name),
bql='sql_folder/step-1.sql',
use_legacy_sql=False,
destination_dataset_table="0.prod_dataset.step1Table_1".format(product_params['project_id'], query_params['event_date_suffix']),
write_disposition='WRITE_TRUNCATE',
params=query_params
)
### following steps...
理想情况下,我想直接在 BigQuery 上查询我的产品参数。而且我已经在 bitbucket 上为它开发了一个 Python 库,还有一堆 Jenkins 广泛使用的其他方法。
有什么方法可以将该库导入到气流中并在我的 dags 中使用它?
否则,除了 BigQueryOperators 之外,我还有其他方法可以构建与 bigquery 交互的方法吗?
【问题讨论】:
【参考方案1】:是的,您可以在 DAG 中使用您的库并将其与 PythonOperator 一起使用。
【讨论】:
导入我自己的库的最佳方式是什么?我正在使用 Cloud Composer。【参考方案2】:您可以通过private repository 或local file 在 Composer 中安装自己的 Python 库。
建议您在部署到生产环境之前test your PyPI packages locally in an Airflow worker container,因为自定义包可能会导致与 Airflow 所需的依赖项发生冲突。
【讨论】:
很好@Javier Bóbeda。我会调查的。谢谢 嗨@Roghen!它对你有用吗?如果是,请将其标记为正确的解决方案,这样我们将来可以帮助社区中的人们。以上是关于气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators的主要内容,如果未能解决你的问题,请参考以下文章
运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶