Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行

Posted

技术标签:

【中文标题】Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行【英文标题】:Airflow - Generating tasks dynamically from BigQuery, but tasks are run repeatedly before previous finishes 【发布时间】:2021-02-25 15:08:16 【问题描述】:

上下文

我正在尝试使用 Composer、DataProc 和 BigQuery 在 Google Cloud Platform 上构建提取管道。我在 BigQuery 中有一个表,其中包含数据源及其相关文件的记录。因此,如果我需要提取 5 个文件,那么 BigQuery 表中有 5 条记录。明天可能是不同数量的文件。因此,我考虑在我的 DAG 中动态构建任务。

高层设计如下:

执行一个函数以从 BigQuery 获取数据作为 Pandas 数据帧(或 dict,两者都可以) 遍历数据框 对于数据框中的每一行,创建一个 DataProcSparkOperator,其中包含有关文件和相应参数的详细信息

此设置运行良好。我可以在 Airflow UI 中看到我的 DAG 和所有动态生成的任务。

编辑:只是添加更多细节。 BigQuery 表将包含少于 25 条记录,因此无需担心查询表。每 30 秒查询一次表。其次,我只需要这个 DAG 每 4 小时左右运行一次。我不打算让我的作曲家在那段时间继续运行。我需要每 4 小时启动 Composer,运行 DAG 一次以处理所有可用文件,然后关闭。

问题

在执行这些 DataProc 任务时,大约几分钟后,Airflow 会刷新 DAG 并再次运行同一组任务。在 DataProc Jobs 控制台中,我看到同一任务的 2 个(有时是 3 个)实例处于运行状态。这是不可取的。

我的尝试

我在任务级别设置了retries=0,在DAG 上我设置了catchup=Falsemax_active_runs=1schedule_interval='@once'。 DAG 的默认参数也有 retries=0

我认为问题在于我从 BigQuery 中提取记录的部分是普通函数的一部分,而不是本身的任务。我没有把它放在任务中的原因是因为我找不到解决方案来将从 BigQuery 获取的结果传递到我必须循环访问它们的后续任务中。

我尝试调用 PythonOperator 并在其中执行 Variable.set("df", df),希望可以循环遍历 Variable.get("df"),但这也没有成功。

下面分享相关代码。

def fetch_pending_files_from_bq():
    # fetch records from BigQuery and return as dataframe

default_args = 
    'start_date': yesterday,
    'default_timezone': 'utc',
    'retries': 0


dag = DAG(
    dagid,
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    description='DAG to ingest data',
    schedule_interval='@once'
)

start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)

pending_files_df = fetch_pending_files_from_bq()

for index, row in pending_files_df.iterrows():
    task = DataProcSparkOperator(
        dag=dag,
        task_id=row["file_name"],
        arguments=dataproc_args,
        region="us-east1",
        job_name="job_".format(task_id),
        dataproc_spark_jars=dataproc_jars,
        ....
        ....
    )

    task.set_upstream(start_dag)
    task.set_downstream(end_dag)

我得到了我想要的编排,唯一的问题是我的 DataProc 作业会自动重新运行。

感谢任何想法。

【问题讨论】:

Airflow 任务应该尽可能是静态的(慢慢改变也可以)。从不断更新的数据库表创建任务不是一个好主意。它还将在您的数据库连接上创建一个快速加载。特别是对于 BigQuery,这可能意味着您还需要支付很多钱。 fetch_pending_files_from_bq 函数每 30 秒执行一次(默认为 min_file_process_interval ),因此您每 30 秒查询一次 BigQuery。取决于您扫描的数据规模,这可能会导致高额费用。 @Elad 就 BigQuery 而言,该表的记录将少于 25 条,因此查询成本不会太多。另外我必须补充一点,我可能只会每 3-4 小时执行一次此 DAG。虽然我不想每 30 秒查询一次 BQ,但这是正确的。我还有什么选择?即使我查询 BQ 并将结果写入磁盘然后循环该文件,写入过程也不会每 30 秒触发一次吗? 您可能需要另一个 ETL 将 BQ 导出到存储。无论如何,这条线很奇怪job_name="job_".format(task_id) task_id 不是定义的参数。你确定那是你的真实代码吗?还请提供显示您所描述的“无法识别”任务出现的问题的图像/日志。 @Elad - 这是我的代码,但它不是完整的代码,因为我不能公开发布它。 task_id 确实是一个有效的参数,并且代码运行良好。让我今天运行它时尝试抓取一些屏幕截图。 在深入研究设计时,我意识到fetch_pending_files_from_bq 不是一项任务,因此每次刷新 dag 时都会执行它。这导致了多个查询,并且还导致意外创建重复任务。因此我放弃了这个设计。我能够使用 subdags 解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。 【参考方案1】:

在深入研究设计时,我意识到 fetch_pending_files_from_bq 不是一项任务,因此每次刷新 dag 时都会执行它。这导致了多个查询,并且还导致意外创建重复任务。因此我放弃了这个设计。

我能够使用 subdags 解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。

【讨论】:

以上是关于Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行的主要内容,如果未能解决你的问题,请参考以下文章

使用 Airflow 将 Bigquery 查询结果发送到数据框

Airflow - BigQuery 架构字段中的值无效

在 DAG 运行期间动态生成 DAG - Airflow

Airflow 中的 BigQuery 参数化查询

从 BigQuery 导出到 MySQL 时出错

Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据