如何在Python Flask框架中运行重复任务

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Python Flask框架中运行重复任务相关的知识,希望对你有一定的参考价值。

Flask是一个使用Python编写的轻量级Web应用框架,凭借更灵活、轻便、安全且容易上手的特性,成为企业常用的Python框架之一。在完成Web前端、Linux以及mysql相关的课程之后,专业的杭州Python学习班都会讲解Flask框架知识,以下是整理的相关知识点。

Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架。开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。

默认情况下,Flask不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask支持用扩展来给应用添加这些功能,如同是Flask本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。

Flask框架的特点:
1)Flask自由、灵活,可扩展性强,第三方库的选择面广,开发时可以结合自己最喜欢用的轮子,也能结合最流行最强大的Python库;
2)入门简单,即便没有多少web开发经验,也能很快做出网站;
3)非常适用于小型网站;
4)非常适用于开发Web服务的API;
5)开发大型网站无压力,但代码架构需要自己设计,开发成本取决于开发者的能力和经验。

Flask框架运行解释
1.app = Flask(__name__)
创建Flask对象app,Flask类的构造函数只有一个必须指定的参数,即程序主模块或包的名字。在大多数程序中,Python的__name__变量就是所需要的值。

2.@app.route('/')
web浏览器把请求发送给Web服务器,Web服务器再把请求发送给Flask程序实例。程序实例需要知道对每个URL请求运行哪些代码,所以保存了一个URL到Python函数的映射关系。处理URL和函数之间的关系的程序称为路由。在Flask程序中定义路由的最简便方式,是使用程序实例提供的app.route修饰器,把修饰的函数注册为路由。route()装饰器告诉 Flask什么样的URL 能触发我们的函数。这和Java中的注释有异曲同工之妙。修饰器是Python语言的标准特性,可以使用不同的方式修改函数的行为。惯常用法是使用修饰器把函数注册为事件的处理程序。

3.def index():函数
index()函数放在@app.route('/')后面,所以就是把index()函数注册为路由。如果部署程序的服务器域名为http://127.0.0.1:5000/,在浏览器中访问http://127.0.0.1:5000/后,会触发服务器执行index()函数。

4.@app.route('/user/')
同@app.route('/'),如果部署程序的服务器域名为http://127.0.0.1:5000/,在浏览器中访问http://127.0.0.1:5000/后,会触发服务器执行下方修饰函数。

5.app.run(debug=True)
程序实例用run方法启动Flask继承Web服务器。

6.if __name__ == '__main__'
当Python解释器,读py文件,它会执行它发现的所有代码。在执行代码之前,它会定义一些变量。例如,如果这个py文件就是主程序,它会设置__name__变量为"__main__"。如果这个py被引入到别的模块,__name__会被设置为该模块的名字。
参考技术A (1)

您可以使用 app.app_context 上下文管理器来设置应用程序上下文。我想象的使用将是这样:

从apscheduler.scheduler import调度

def checkSecondApi ):
with app.app_context():
#做任何你正在做的检查第二个API

@ app.before_first_request
def initialize():
apsched = Scheduler()
apsched.start()

apsched.add_interval_job(checkFirstAPI,seconds = 5)
apsched.add_interval_job(checkSecondAPI,seconds = 5)
apsched.add_interval_job(checkThirdAPI,seconds = 5)

或者,可以使用装饰器

def with_application_context(app):
def inner(func):
@ functools.wraps
def wrapper(* args,** kwargs):
with app.app_context():
return func(* args,** kwargs)
return wrapper
return inner

@with_application_context(app)
def checkFirstAPI():
#检查前面的API

(2)

是的,它仍然可以工作。唯一(重要的)区别是,你的应用程序不会直接与世界沟通;它将通过反向代理或某事通过fastcgi / uwsgi /任何。唯一需要注意的是,如果应用程序启动的多个实例,则会创建多个调度程序。为了管理这个,我建议你移动你的后端任务从Flask应用程序,并使用专为运行任务工具定期(即芹菜)。这样做的缺点是,你将不能使用像Flask-Mail,但imo,它不是太好,以至于如此密切地关联到Flask生态系统;通过使用Flask-Mail在标准的非Flask邮件库中获得什么?

此外,分解应用程序可以更容易扩展单个组件与具有一个单片式Web应用程序相比,需要容量。本回答被提问者采纳

Python Flask后端异步处理

  Flask是Python中有名的轻量级同步Web框架,但是在实际的开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的相应状态返回给前端,不让前端界面卡顿。

  在碎遮扫描系统的编写中,当对目标进行全方位扫描时,扫描这个动作是需要长时间处理的。因为现在还没有加快每个部分的扫描速度,所以想要将一个目标扫描完大概需要五到十分钟的时间,所以不可能让用户一直等待这么长时间的页面刷新,需要将任务放在后台执行,先给用户返回正常的界面。

  本文介绍Python原生的方式,即多线程或者多进程来进行耗时任务的执行,实现后端的异步处理。这样也很好理解,直接暴力开启一个新的线程/进程来进行耗时任务的执行,主线程/进程用来返回正常前端界面给用户。

  通过多线程来实现

import time
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
#executor = ThreadPoolExecutor(10)里面的数字是线程池所能同时进行的最大数量

def run():
    time.sleep(10)
    print("耗时任务执行结束")

@app.route(‘/test‘)
def test():
    # 交给线程去处理耗时任务
    executor.submit(run)
    return "cheer!"

  通过多进程来实现:

import time
from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor()
#不限制数量的话最大进程数即为电脑CPU的核数

@app.route(‘/test_console‘, methods=[‘GET‘, ‘POST‘])
@login_required
def console():
    bugbit, bugtype = core.GetBit()
    counts = core.GetCounts()
    ports = core.GetPort()
    services = core.GetServices()
    target = core.GetTargetCount()
    try:
        lastscantime = BaseInfo.query.order_by(BaseInfo.id.desc()).first().date
    except:
        lastscantime = "暂无扫描"
        pass
    if request.method == ‘GET‘:
        return render_template(‘console.html‘, bugbit=bugbit, bugtype=bugtype, counts=counts, lastscantime=lastscantime,
                               ports=ports, services=services, target=target)
    else:
        urls = request.form.get(‘urls‘)
        urls = urls.split()
        print(urls)
        for url in urls:
            redispool.hincrby(‘targetscan‘, ‘waitcount‘, 1)
        executor.submit(SZheConsole, urls)
        target = core.GetTargetCount()
        return render_template(‘console.html‘, bugbit=bugbit, bugtype=bugtype, counts=counts, lastscantime=lastscantime,ports=ports, services=services, target=target)

  这里是碎遮扫描系统目前后端异步的多进程实现,之前时间匆忙所以实现的很简陋哈哈哈,今天准备弃掉原生的后端异步实现了,重构一下扫描系统的后端异步。

  其中

executor.submit(SZheConsole, urls)

  就是使用SZheConsole函数对输入的扫描目标进行全方位目标检测,在这里开启了一个新的进程执行扫描,接着就返回了前端界面,让用户能够正常看到界面。

return render_template(‘console.html‘, bugbit=bugbit, bugtype=bugtype, counts=counts, lastscantime=lastscantime,
                               ports=ports, services=services, target=target)

  虽然这样的原生实现异步处理很简单快捷,但是不足的地方在扫描系统的使用过程中已经有很多师傅提出来了:没有扫描的进度条显示,不能对扫描任务进行暂停,如果有个地方卡死了,就只能一直卡在那里,而不能进行丢弃等等,所以需要使用已存在的异步框架来优化异步处理:D

  请听下回分解。咕咕咕

参考链接:

以上是关于如何在Python Flask框架中运行重复任务的主要内容,如果未能解决你的问题,请参考以下文章

如何在Flask框架中制作blog系统

解决多进程中APScheduler重复运行的问题

解决多进程中APScheduler重复运行的问题

解决多进程中APScheduler重复运行的问题

Python Flask后端异步处理

python之celery在flask中使用