气流 - 如何获得所有未来的运行日期
Posted
技术标签:
【中文标题】气流 - 如何获得所有未来的运行日期【英文标题】:Airflow - how to get all the future run date 【发布时间】:2020-08-21 17:07:17 【问题描述】:我正在安排气流作业。但是,为了验证我是否安排了正确的作业,我需要查看它将来何时运行。
Airflow 具有以下命令,可以让我进行下一次运行。但是,这对于某些用例来说还不够。例如,我安排了每隔一个星期五运行一次作业。如何验证。
airflow next_execution <dag_id>
有没有办法,我可以获得这个 dag 运行的所有未来日期。或至少几个?
【问题讨论】:
【参考方案1】:虽然大多数进程使用croniter
,但如果您有权访问您的安装,最好通过现有接口从“源”获取信息:
from airflow import models
from datetime import datetime, timedelta
dag_bag = models.DagBag()
dag_id = "dag_name"
dag = dag_bag.get_dag(dag_id)
now = datetime.now()
until = now + timedelta(days=21)
runs = dag.get_run_dates(start_date=now, end_date=until)
print(runs)
【讨论】:
我在 dag 上运行时遇到异常。from sshtunnel import SSHTunnelForwarder ImportError: No module named sshtunnel
你是什么意思“在 dag 上奔跑”?从错误的外观来看,您缺少一个包。
包是。可用..我是。可以手动触发 dag
不,不是吗?无论执行在哪里,您的安装都无法找到 sshtunnel
- 创建一个带有跟踪和详细信息的新问题以获取帮助。
[root@server airflow]# pip freeze | grep -i sshtunnel sshtunnel==0.1.5
【参考方案2】:
Airflow 在钩子 croniter 下使用,用于example。按照croniter 文档中的示例,这可以按如下方式工作(例如,假设 dag 在每个星期五的下午 12 点运行,并且我们的基准日期是昨天的 8 月 20 日)。
from croniter import croniter
from datetime import datetime
# Specify current date
base = datetime(2020, 8, 20, 0, 0)
# Set croniter
iter = croniter('0 12 * * 5', base)
# Get next execution
iter.get_next(datetime)
>>>
datetime.datetime(2020, 8, 21, 12, 0)
您可以在其中指定base
作为您的 dag (dag.latest_execution_date
) 的最新执行日期。您可以通过调用 n 次 iter.get_next(datetime)
来获取它的以下执行情况。
【讨论】:
以上是关于气流 - 如何获得所有未来的运行日期的主要内容,如果未能解决你的问题,请参考以下文章