Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行
Posted
技术标签:
【中文标题】Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行【英文标题】:Airflow - Generating tasks dynamically from BigQuery, but tasks are run repeatedly before previous finishes 【发布时间】:2021-02-25 15:08:16 【问题描述】:上下文
我正在尝试使用 Composer、DataProc 和 BigQuery 在 Google Cloud Platform 上构建提取管道。我在 BigQuery 中有一个表,其中包含数据源及其相关文件的记录。因此,如果我需要提取 5 个文件,那么 BigQuery 表中有 5 条记录。明天可能是不同数量的文件。因此,我考虑在我的 DAG 中动态构建任务。
高层设计如下:
执行一个函数以从 BigQuery 获取数据作为 Pandas 数据帧(或 dict,两者都可以) 遍历数据框 对于数据框中的每一行,创建一个 DataProcSparkOperator,其中包含有关文件和相应参数的详细信息此设置运行良好。我可以在 Airflow UI 中看到我的 DAG 和所有动态生成的任务。
编辑:只是添加更多细节。 BigQuery 表将包含少于 25 条记录,因此无需担心查询表。每 30 秒查询一次表。其次,我只需要这个 DAG 每 4 小时左右运行一次。我不打算让我的作曲家在那段时间继续运行。我需要每 4 小时启动 Composer,运行 DAG 一次以处理所有可用文件,然后关闭。
问题
在执行这些 DataProc 任务时,大约几分钟后,Airflow 会刷新 DAG 并再次运行同一组任务。在 DataProc Jobs 控制台中,我看到同一任务的 2 个(有时是 3 个)实例处于运行状态。这是不可取的。
我的尝试
我在任务级别设置了retries=0
,在DAG 上我设置了catchup=False
、max_active_runs=1
和schedule_interval='@once'
。 DAG 的默认参数也有 retries=0
。
我认为问题在于我从 BigQuery 中提取记录的部分是普通函数的一部分,而不是本身的任务。我没有把它放在任务中的原因是因为我找不到解决方案来将从 BigQuery 获取的结果传递到我必须循环访问它们的后续任务中。
我尝试调用 PythonOperator 并在其中执行 Variable.set("df", df)
,希望可以循环遍历 Variable.get("df")
,但这也没有成功。
下面分享相关代码。
def fetch_pending_files_from_bq():
# fetch records from BigQuery and return as dataframe
default_args =
'start_date': yesterday,
'default_timezone': 'utc',
'retries': 0
dag = DAG(
dagid,
default_args=default_args,
catchup=False,
max_active_runs=1,
description='DAG to ingest data',
schedule_interval='@once'
)
start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)
pending_files_df = fetch_pending_files_from_bq()
for index, row in pending_files_df.iterrows():
task = DataProcSparkOperator(
dag=dag,
task_id=row["file_name"],
arguments=dataproc_args,
region="us-east1",
job_name="job_".format(task_id),
dataproc_spark_jars=dataproc_jars,
....
....
)
task.set_upstream(start_dag)
task.set_downstream(end_dag)
我得到了我想要的编排,唯一的问题是我的 DataProc 作业会自动重新运行。
感谢任何想法。
【问题讨论】:
Airflow 任务应该尽可能是静态的(慢慢改变也可以)。从不断更新的数据库表创建任务不是一个好主意。它还将在您的数据库连接上创建一个快速加载。特别是对于 BigQuery,这可能意味着您还需要支付很多钱。fetch_pending_files_from_bq
函数每 30 秒执行一次(默认为 min_file_process_interval
),因此您每 30 秒查询一次 BigQuery。取决于您扫描的数据规模,这可能会导致高额费用。
@Elad 就 BigQuery 而言,该表的记录将少于 25 条,因此查询成本不会太多。另外我必须补充一点,我可能只会每 3-4 小时执行一次此 DAG。虽然我不想每 30 秒查询一次 BQ,但这是正确的。我还有什么选择?即使我查询 BQ 并将结果写入磁盘然后循环该文件,写入过程也不会每 30 秒触发一次吗?
您可能需要另一个 ETL 将 BQ 导出到存储。无论如何,这条线很奇怪job_name="job_".format(task_id)
task_id 不是定义的参数。你确定那是你的真实代码吗?还请提供显示您所描述的“无法识别”任务出现的问题的图像/日志。
@Elad - 这是我的代码,但它不是完整的代码,因为我不能公开发布它。 task_id
确实是一个有效的参数,并且代码运行良好。让我今天运行它时尝试抓取一些屏幕截图。
在深入研究设计时,我意识到fetch_pending_files_from_bq
不是一项任务,因此每次刷新 dag 时都会执行它。这导致了多个查询,并且还导致意外创建重复任务。因此我放弃了这个设计。我能够使用 subdags 解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。
【参考方案1】:
在深入研究设计时,我意识到 fetch_pending_files_from_bq 不是一项任务,因此每次刷新 dag 时都会执行它。这导致了多个查询,并且还导致意外创建重复任务。因此我放弃了这个设计。
我能够使用 subdags 解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。
【讨论】:
以上是关于Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行的主要内容,如果未能解决你的问题,请参考以下文章