如何在 Python 中使用 Flask 执行周期性任务

Posted

技术标签:

【中文标题】如何在 Python 中使用 Flask 执行周期性任务【英文标题】:How to perform periodic task with Flask in Python 【发布时间】:2012-08-02 08:55:59 【问题描述】:

我一直在使用Flask 为我的k8055 USB 接口板提供一个简单的Web API;相当标准的吸气剂和推杆,Flask 真的让我的生活变得轻松多了。

但我希望能够在乳清发生时将状态更改注册为 / 附近。

例如,如果我有一个连接到电路板的按钮,我可以轮询该特定端口的 api。但是如果我想让输出直接反映输出,无论是否有人在与 api 交谈,我都会有这样的东西。

while True:
    board.read()
    board.digital_outputs = board.digital_inputs
    board.read()
    time.sleep(1)

每秒都会更新输出以匹配输入。

在Flask下有没有办法做这种事情?我以前在 Twisted 中做过类似的事情,但是 Flask 太方便了,这个特殊的应用程序还不能放弃它......

谢谢。

【问题讨论】:

【参考方案1】:

对于我的 Flask 应用程序,我考虑使用 Pashka 在他的 answer、schedule 库和 APScheduler 中描述的 cron 方法。

我发现 APScheduler 很简单,可以满足定期任务运行的目的,所以继续使用 APScheduler。

示例代码:

from flask import Flask

from apscheduler.schedulers.background import BackgroundScheduler


app = Flask(__name__)

def test_job():
    print('I am working...')

scheduler = BackgroundScheduler()
job = scheduler.add_job(test_job, 'interval', minutes=1)
scheduler.start()

【讨论】:

由于某种原因,当设置了FLASK_ENV=development时,每经过一个间隔,job就会执行两次。 @TobiasFeil 这是 Werkzeug 开发服务器重新加载行为。您可以使用flask run --no-reload 禁用重新加载器,或者将作业创建包含在if 条件if os.environ.get("WERKZEUG_RUN_MAIN") == "true": 中。请参阅***.com/a/9476701/3559967 了解更多信息。 我可以从烧瓶路由启动、停止并将变量传递给“”“test_job()”“”函数吗? @MikeSandstrom 您可以通过删除它来停止工作。在此处查看示例:apscheduler.readthedocs.io/en/latest/… 是的,你是对的,这就是你将变量传递给函数的方式。该函数也必须接受这些变量才能像任何其他函数一样工作。祝你好运!【参考方案2】:

您可以将 cron 用于简单任务。

为您的任务创建一个烧瓶视图。

# a separate view for periodic task
@app.route('/task')
def task():
    board.read()
    board.digital_outputs = board.digital_inputs

然后使用 cron,定期从该 url 下载

# cron task to run each minute
0-59 * * * * run_task.sh

run_task.sh 的内容在哪里

wget http://localhost/task

Cron 的运行频率不能超过一分钟一次。如果你需要更高的频率,(比如说,每 5 秒 = 每分钟 12 次),你必须在 tun_task.sh 中按以下方式进行

# loop 12 times with a delay
for i in 1 2 3 4 5 6 7 8 9 10 11 12
do
    # download url in background for not to affect delay interval much
    wget -b http://localhost/task
    sleep 5s
done

【讨论】:

【参考方案3】:

不,Flask 中不支持任务,但您可以使用 flask-celery 或简单地在单独的线程(greenlet)中运行您的函数。

【讨论】:

感谢您的建议。我沿着 gevent/greenlet 路线走,但似乎“主”线程没有屈服于循环线程(使用 gevent.sleep 而不是上面的时间)至于芹菜,肯定实现消息队列服务器对于某些事情来说是矫枉过正的“简单”(tm)?

以上是关于如何在 Python 中使用 Flask 执行周期性任务的主要内容,如果未能解决你的问题,请参考以下文章

使用 Flask 执行 python 脚本 [重复]

在Flask 应用程序中使用Celery的4种用例

在Flask中,g是什么?它的生命周期是?能做什么?

Flask:设置应用程序和请求特定的属性?

如何从 Flask App 执行 Shell 脚本 [重复]

Python笔记-Linux平台中Flask打包成执行程序