如何在Python Flask框架中运行重复任务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Python Flask框架中运行重复任务相关的知识,希望对你有一定的参考价值。
Flask是一个使用Python编写的轻量级Web应用框架,凭借更灵活、轻便、安全且容易上手的特性,成为企业常用的Python框架之一。在完成Web前端、Linux以及mysql相关的课程之后,专业的杭州Python学习班都会讲解Flask框架知识,以下是整理的相关知识点。Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架。开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。
默认情况下,Flask不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask支持用扩展来给应用添加这些功能,如同是Flask本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。
Flask框架的特点:
1)Flask自由、灵活,可扩展性强,第三方库的选择面广,开发时可以结合自己最喜欢用的轮子,也能结合最流行最强大的Python库;
2)入门简单,即便没有多少web开发经验,也能很快做出网站;
3)非常适用于小型网站;
4)非常适用于开发Web服务的API;
5)开发大型网站无压力,但代码架构需要自己设计,开发成本取决于开发者的能力和经验。
Flask框架运行解释
1.app = Flask(__name__)
创建Flask对象app,Flask类的构造函数只有一个必须指定的参数,即程序主模块或包的名字。在大多数程序中,Python的__name__变量就是所需要的值。
2.@app.route('/')
web浏览器把请求发送给Web服务器,Web服务器再把请求发送给Flask程序实例。程序实例需要知道对每个URL请求运行哪些代码,所以保存了一个URL到Python函数的映射关系。处理URL和函数之间的关系的程序称为路由。在Flask程序中定义路由的最简便方式,是使用程序实例提供的app.route修饰器,把修饰的函数注册为路由。route()装饰器告诉 Flask什么样的URL 能触发我们的函数。这和Java中的注释有异曲同工之妙。修饰器是Python语言的标准特性,可以使用不同的方式修改函数的行为。惯常用法是使用修饰器把函数注册为事件的处理程序。
3.def index():函数
index()函数放在@app.route('/')后面,所以就是把index()函数注册为路由。如果部署程序的服务器域名为http://127.0.0.1:5000/,在浏览器中访问http://127.0.0.1:5000/后,会触发服务器执行index()函数。
4.@app.route('/user/')
同@app.route('/'),如果部署程序的服务器域名为http://127.0.0.1:5000/,在浏览器中访问http://127.0.0.1:5000/后,会触发服务器执行下方修饰函数。
5.app.run(debug=True)
程序实例用run方法启动Flask继承Web服务器。
6.if __name__ == '__main__'
当Python解释器,读py文件,它会执行它发现的所有代码。在执行代码之前,它会定义一些变量。例如,如果这个py文件就是主程序,它会设置__name__变量为"__main__"。如果这个py被引入到别的模块,__name__会被设置为该模块的名字。 参考技术A (1)
您可以使用 app.app_context 上下文管理器来设置应用程序上下文。我想象的使用将是这样:
从apscheduler.scheduler import调度
def checkSecondApi ):
with app.app_context():
#做任何你正在做的检查第二个API
@ app.before_first_request
def initialize():
apsched = Scheduler()
apsched.start()
apsched.add_interval_job(checkFirstAPI,seconds = 5)
apsched.add_interval_job(checkSecondAPI,seconds = 5)
apsched.add_interval_job(checkThirdAPI,seconds = 5)
或者,可以使用装饰器
def with_application_context(app):
def inner(func):
@ functools.wraps
def wrapper(* args,** kwargs):
with app.app_context():
return func(* args,** kwargs)
return wrapper
return inner
@with_application_context(app)
def checkFirstAPI():
#检查前面的API
(2)
是的,它仍然可以工作。唯一(重要的)区别是,你的应用程序不会直接与世界沟通;它将通过反向代理或某事通过fastcgi / uwsgi /任何。唯一需要注意的是,如果应用程序启动的多个实例,则会创建多个调度程序。为了管理这个,我建议你移动你的后端任务从Flask应用程序,并使用专为运行任务工具定期(即芹菜)。这样做的缺点是,你将不能使用像Flask-Mail,但imo,它不是太好,以至于如此密切地关联到Flask生态系统;通过使用Flask-Mail在标准的非Flask邮件库中获得什么?
此外,分解应用程序可以更容易扩展单个组件与具有一个单片式Web应用程序相比,需要容量。本回答被提问者采纳
Python Flask后端异步处理
Flask是Python中有名的轻量级同步Web框架,但是在实际的开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的相应状态返回给前端,不让前端界面卡顿。
在碎遮扫描系统的编写中,当对目标进行全方位扫描时,扫描这个动作是需要长时间处理的。因为现在还没有加快每个部分的扫描速度,所以想要将一个目标扫描完大概需要五到十分钟的时间,所以不可能让用户一直等待这么长时间的页面刷新,需要将任务放在后台执行,先给用户返回正常的界面。
本文介绍Python原生的方式,即多线程或者多进程来进行耗时任务的执行,实现后端的异步处理。这样也很好理解,直接暴力开启一个新的线程/进程来进行耗时任务的执行,主线程/进程用来返回正常前端界面给用户。
通过多线程来实现
import time from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor() #executor = ThreadPoolExecutor(10)里面的数字是线程池所能同时进行的最大数量 def run(): time.sleep(10) print("耗时任务执行结束") @app.route(‘/test‘) def test(): # 交给线程去处理耗时任务 executor.submit(run) return "cheer!"
通过多进程来实现:
import time from concurrent.futures import ProcessPoolExecutor executor = ProcessPoolExecutor() #不限制数量的话最大进程数即为电脑CPU的核数 @app.route(‘/test_console‘, methods=[‘GET‘, ‘POST‘]) @login_required def console(): bugbit, bugtype = core.GetBit() counts = core.GetCounts() ports = core.GetPort() services = core.GetServices() target = core.GetTargetCount() try: lastscantime = BaseInfo.query.order_by(BaseInfo.id.desc()).first().date except: lastscantime = "暂无扫描" pass if request.method == ‘GET‘: return render_template(‘console.html‘, bugbit=bugbit, bugtype=bugtype, counts=counts, lastscantime=lastscantime, ports=ports, services=services, target=target) else: urls = request.form.get(‘urls‘) urls = urls.split() print(urls) for url in urls: redispool.hincrby(‘targetscan‘, ‘waitcount‘, 1) executor.submit(SZheConsole, urls) target = core.GetTargetCount() return render_template(‘console.html‘, bugbit=bugbit, bugtype=bugtype, counts=counts, lastscantime=lastscantime,ports=ports, services=services, target=target)
这里是碎遮扫描系统目前后端异步的多进程实现,之前时间匆忙所以实现的很简陋哈哈哈,今天准备弃掉原生的后端异步实现了,重构一下扫描系统的后端异步。
其中
executor.submit(SZheConsole, urls)
就是使用SZheConsole函数对输入的扫描目标进行全方位目标检测,在这里开启了一个新的进程执行扫描,接着就返回了前端界面,让用户能够正常看到界面。
return render_template(‘console.html‘, bugbit=bugbit, bugtype=bugtype, counts=counts, lastscantime=lastscantime, ports=ports, services=services, target=target)
虽然这样的原生实现异步处理很简单快捷,但是不足的地方在扫描系统的使用过程中已经有很多师傅提出来了:没有扫描的进度条显示,不能对扫描任务进行暂停,如果有个地方卡死了,就只能一直卡在那里,而不能进行丢弃等等,所以需要使用已存在的异步框架来优化异步处理:D
请听下回分解。
参考链接:
以上是关于如何在Python Flask框架中运行重复任务的主要内容,如果未能解决你的问题,请参考以下文章