气流 - 如何获得所有未来的运行日期

Posted

技术标签:

【中文标题】气流 - 如何获得所有未来的运行日期【英文标题】:Airflow - how to get all the future run date 【发布时间】:2020-08-21 17:07:17 【问题描述】:

我正在安排气流作业。但是,为了验证我是否安排了正确的作业,我需要查看它将来何时运行。

Airflow 具有以下命令,可以让我进行下一次运行。但是,这对于某些用例来说还不够。例如,我安排了每隔一个星期五运行一次作业。如何验证。

airflow next_execution <dag_id>

有没有办法,我可以获得这个 dag 运行的所有未来日期。或至少几个?

【问题讨论】:

【参考方案1】:

虽然大多数进程使用croniter,但如果您有权访问您的安装,最好通过现有接口从“源”获取信息:

from airflow import models
from datetime import datetime, timedelta


dag_bag = models.DagBag()

dag_id = "dag_name"
dag = dag_bag.get_dag(dag_id)

now = datetime.now()
until = now + timedelta(days=21)

runs = dag.get_run_dates(start_date=now, end_date=until)
print(runs)

【讨论】:

我在 dag 上运行时遇到异常。 from sshtunnel import SSHTunnelForwarder ImportError: No module named sshtunnel 你是什么意思“在 dag 上奔跑”?从错误的外观来看,您缺少一个包。 包是。可用..我是。可以手动触发 dag 不,不是吗?无论执行在哪里,您的安装都无法找到 sshtunnel - 创建一个带有跟踪和详细信息的新问题以获取帮助。 [root@server airflow]# pip freeze | grep -i sshtunnel sshtunnel==0.1.5【参考方案2】:

Airflow 在钩子 croniter 下使用,用于example。按照croniter 文档中的示例,这可以按如下方式工作(例如,假设 dag 在每个星期五的下午 12 点运行,并且我们的基准日期是昨天的 8 月 20 日)。

from croniter import croniter 
from datetime import datetime

# Specify current date
base = datetime(2020, 8, 20, 0, 0)

# Set croniter
iter = croniter('0 12 * * 5', base)  

# Get next execution 
iter.get_next(datetime)
>>>
datetime.datetime(2020, 8, 21, 12, 0)

您可以在其中指定base 作为您的 dag (dag.latest_execution_date) 的最新执行日期。您可以通过调用 n 次 iter.get_next(datetime) 来获取它的以下执行情况。

【讨论】:

以上是关于气流 - 如何获得所有未来的运行日期的主要内容,如果未能解决你的问题,请参考以下文章

气流回填不起作用

气流 - 如何仅“填充 DagBag”一次

如何使用气流检查长时间运行的 http 任务的状态?

如何防止气流回填 dag 运行?

如何防止气流回填dag运行?

气流回填澄清