如何安排一个函数在 Flask 上每小时运行一次?
Posted
技术标签:
【中文标题】如何安排一个函数在 Flask 上每小时运行一次?【英文标题】:How to schedule a function to run every hour on Flask? 【发布时间】:2014-02-08 10:59:45 【问题描述】:我有一个无法访问 cron
命令的 Flask 虚拟主机。
如何每小时执行一些 Python 函数?
【问题讨论】:
【参考方案1】:您可以在 Flask 应用程序中使用 APScheduler
并通过其界面运行您的作业:
import atexit
# v2.x version - see https://***.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask
app = Flask(__name__)
cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()
@cron.interval_schedule(hours=1)
def job_function():
# Do your work here
# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))
if __name__ == '__main__':
app.run()
【讨论】:
我可以问一个新手问题吗?为什么atexit.register
中有lambda
?
因为atexit.register
需要一个函数来调用。如果我们刚刚通过cron.shutdown(wait=False)
,我们将通过调用cron.shutdown
(可能是None
)的结果。因此,我们改为传递一个零参数函数,而不是给它一个名称并使用 statement def shutdown(): cron.shutdown(wait=False)
和 atexit.register(shutdown)
而是使用 lambda:
内联注册它(这是一个零-参数函数表达式。)
谢谢。所以问题是我们想将参数传递给函数,如果我理解正确的话。【参考方案2】:
您可以使用来自APScheduler 包(v3.5.3)的BackgroundScheduler()
:
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
def print_date_time():
print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))
scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=60)
scheduler.start()
# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())
请注意,当 Flask 处于调试模式时,将启动其中两个调度程序。如需更多信息,请查看this 问题。
【讨论】:
@user5547025 如果我将内容放在 schedule.py 中,计划如何工作,它将如何自动运行? 我认为 user5547025 建议的调度是用于可以阻塞主线程的同步任务。您将需要启动一个工作线程以使其不阻塞。 如果flask
有App.runonce
或App.runForNseconds
你可以在schedule
和flask runner 之间切换,但情况并非如此,所以目前唯一的方法是使用这个
谢谢!在 if name__=="__main" 下,我应该在哪里插入这个函数?我们也可以用我们的函数替换 print_date_time 函数吗?
@Dewsworld 为什么在最后一行使用 lambda?为什么不代替atexit.register(lambda: scheduler.shutdown())
只做atexit.register(scheduler.shutdown)
【参考方案3】:
我尝试过使用烧瓶而不是简单的 apscheduler 你需要安装的是
pip3 安装 flask_apscheduler
以下是我的代码示例:
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
scheduler = APScheduler()
def scheduleTask():
print("This test runs every 3 seconds")
if __name__ == '__main__':
scheduler.add_job(id = 'Scheduled Task', func=scheduleTask, trigger="interval", seconds=3)
scheduler.start()
app.run(host="0.0.0.0")
【讨论】:
【参考方案4】:你可以使用 flask-crontab 模块,非常简单。
第一步:pip install flask-crontab
第 2 步:
from flask import Flask
from flask_crontab import Crontab
app = Flask(__name__)
crontab = Crontab(app)
第 3 步:
@crontab.job(minute="0", hour="6", day="*", month="*", day_of_week="*")
def my_scheduled_job():
do_something()
第 4 步:在 cmd 上,点击
flask crontab add
完成。现在只需运行您的烧瓶应用程序,您就可以检查您的函数将在每天 6:00 调用。
您可以参考Here(官方文档)。
【讨论】:
【参考方案5】:我对应用程序调度程序的概念有点新意,但我在这里找到的 APScheduler v3.3.1 有点不同。我相信对于最新版本,包结构、类名等都发生了变化,所以我在这里放一个我最近制作的新解决方案,与基本的 Flask 应用程序集成:
#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
def sensor():
""" Function for test purposes. """
print("Scheduler is alive!")
sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()
app = Flask(__name__)
@app.route("/home")
def home():
""" Function for test purposes. """
return "Welcome Home :) !"
if __name__ == "__main__":
app.run()
如果有人对此示例的更新感兴趣,我也将离开这个 Gist here。
这里有一些参考资料,供以后阅读:
APScheduler 文档:https://apscheduler.readthedocs.io/en/latest/ 守护进程=真:https://docs.python.org/3.4/library/threading.html#thread-objects【讨论】:
这很好用,希望随着更多人看到这个帖子,它的票数会更高。 你试过在 PythonAnywhere 等网络应用程序上使用它吗? 谢谢,@Mwspencer。是的,我使用过并且效果很好 :),尽管我建议您探索apscheduler.schedulers.background
提供的更多选项,因为您可能会遇到其他对您的应用程序有用的场景。问候。
应用存在时别忘了关闭调度器
您好!对于有多个 gunicorn 工人的情况,您能提供一些建议吗?我的意思是,调度程序会为每个工作人员执行一次吗?【参考方案6】:
对于一个简单的解决方案,您可以添加一个路由,例如
@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
logging.info("Did the thing")
return "OK", 200
然后add a unix cron job 定期发布到此端点。例如,每分钟运行一次,在终端输入 crontab -e
并添加以下行:
* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing
(注意 curl 的路径必须是完整的,因为当作业运行时它不会有你的 PATH。你可以通过which curl
在你的系统上找到 curl 的完整路径) em>
我喜欢这个,因为它很容易手动测试作业,它没有额外的依赖项,而且没有任何特别的事情,很容易理解。
安全
如果您想用密码保护您的 cron 作业,您可以pip install Flask-BasicAuth
,然后将凭据添加到您的应用配置中:
app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'
密码保护作业端点:
from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)
@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
logging.info("Did the thing a bit more securely")
return "OK", 200
然后从您的 cron 作业中调用它:
* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing
【讨论】:
你是个天才!非常方便的提示。 这种方法的问题在于,很难记录应用程序有计划的组件。也就是说,我过去曾这样做过。 @SidKwakkel 包含“这个函数是从 cron 调用的”这样的评论很有帮助,但我知道你的意思。我有一些代码我无法快速回忆起它是如何(以及是否)定期执行的。【参考方案7】:使用调度和多处理的完整示例,带有开和关控制以及 run_job() 的参数
返回代码被简化,间隔设置为 10 秒,更改为 every(2).hour.do()
for 2 小时。时间表非常令人印象深刻,它不会漂移,而且我在安排时从未见过它超过 100 毫秒。使用多处理而不是线程,因为它有一个终止方法。
#!/usr/bin/env python3
import schedule
import time
import datetime
import uuid
from flask import Flask, request
from multiprocessing import Process
app = Flask(__name__)
t = None
job_timer = None
def run_job(id):
""" sample job with parameter """
global job_timer
print("timer job id=".format(id))
print("timer: :.4fsec".format(time.time() - job_timer))
job_timer = time.time()
def run_schedule():
""" infinite loop for schedule """
global job_timer
job_timer = time.time()
while 1:
schedule.run_pending()
time.sleep(1)
@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
global t, job_timer
if status=='on' and not t:
schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
t = Process(target=run_schedule)
t.start()
return "timer on with interval:sec\n".format(nsec)
elif status=='off' and t:
if t:
t.terminate()
t = None
schedule.clear()
return "timer off\n"
return "timer status not changed\n"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
您只需发出以下命令进行测试:
$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed
定时器每 10 秒启动一次,它会向控制台发出一条定时器消息:
127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec
【讨论】:
我不是多处理方面的专家,但如果你使用它,你很可能会遇到 pickle 错误。 @PatrickMutuku,我看到的数字序列化(cookies,临时文件)的唯一问题是异步和 websockets,但是 Flask 不是你的 api,看看github.com/kennethreitz/responder。 Flask 在纯 REST 上表现出色,在 apache wsgi 上有一个简单的前端。【参考方案8】:您可以尝试使用APScheduler's BackgroundScheduler 将间隔作业集成到您的 Flask 应用程序中。下面是使用蓝图和应用工厂 (init.py) 的示例:
from datetime import datetime
# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
from webapp.models.main import db
from webapp.controllers.main import main_blueprint
# define the job
def hello_job():
print('Hello Job! The time is: %s' % datetime.now())
def create_app(object_name):
app = Flask(__name__)
app.config.from_object(object_name)
db.init_app(app)
app.register_blueprint(main_blueprint)
# init BackgroundScheduler job
scheduler = BackgroundScheduler()
# in your case you could change seconds to hours
scheduler.add_job(hello_job, trigger='interval', seconds=3)
scheduler.start()
try:
# To keep the main thread alive
return app
except:
# shutdown if app occurs except
scheduler.shutdown()
希望对你有帮助:)
参考:
-
https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py
【讨论】:
我确信 return 语句永远不会引发异常【参考方案9】:另一种选择可能是使用与 Flask 配合得很好的 Flask-APScheduler,例如:
从 Flask 配置加载调度器配置, 从 Flask 配置加载作业定义更多信息在这里:
https://pypi.python.org/pypi/Flask-APScheduler
【讨论】:
【参考方案10】:您可能想使用一些队列机制和调度程序,如 RQ scheduler 或更重的东西,如 Celery(很可能是矫枉过正)。
【讨论】:
以上是关于如何安排一个函数在 Flask 上每小时运行一次?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Azure Logic 应用程序中为每两分钟触发器安排天数和小时数?