Flask例子-实现Redis Task Queue(任务队列)
Posted Python程序员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flask例子-实现Redis Task Queue(任务队列)相关的知识,希望对你有一定的参考价值。
Python部落组织翻译,禁止转载,欢迎转发
在本文中,我们加入基础的 Redis task queue来进行文本处理。
有许多的工具也能够实现,例如ReTask和HotQueue。我们将使用的是Python RQ。它是一个建立在Redis之上用来创建task queue(任务队列)的简单的库,并且容易设置和实现。
请牢记,我们是这样创建的:一个基于给定的URL中的文本用来统计单词的分布对的Flask应用。这是一个完整的教程。
1、 第一部分:设置本地开发环境,接着在Heroku云平台上部署staging环境和production环境。
2、 第二部分:设置一个带有SQLAlche和Alembic的PostgreSQL数据库来处理迁移。
3、 第三部分:加入后端逻辑到scrape,然后使用requests,BeautifulSoup和Natural Language Toolkit(NLTK)库来计算处理从页面中得到的单词。
4、 第四部分:实现一个Redis task queue 来进行文本处理。(本文描述的)
5、 第五部分:安装Angular前端框架来不断查看后端框架请求是否完成。
6、 第六部分:启动在Heroku云平台上的staging服务——设置Redis,详细介绍如何在单个Dyno上运行两个进程(web和worker)。(ps:在Heroku云平台上,对于每一个进程采用一个叫Dyno的单位来进行性能管理,增加这个值,则能提高应用的相应速度和吞吐量。有适用于Web server的Web dyno和作为后台进程的Worker dyno。)
7、 第七部分:更新前端使其用户界面更友好。
8、 第八部分:将D3库添加到混合频率分布图和柱状图。
需要代码吗?可以到repo(https://github.com/realpython/flask-by-example/releases)上抓取。
安装要求
工具:
Redis——http://redis.io/
Python Redis —— https://pypi.python.org/pypi/redis/2.10.5
RQ(Redis Queue)——http://python-rq.org/
首先从http://redis.io/下载和安装Redis(或者使用Homebrew命令——brew install redis),然后开启Redis服务。
$ redis-server
接下来在一个新的终端窗口安装Python Redis 库和RQ库。
$ workon wordcounts
$ pip install redis rq
$ pip freeze > requirements.txt
设置Worker
接下来我们开始创建一个worker进程来监听queued tasks(队列任务)。
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
保存以上文本为worker.py。在这段代码中,我们监听命名为default的queue(队列)并且与我们的Redis服务在localhost:6379 建立连接。
开启另一个终端窗口:
$ workon wordcounts
$ python worker.py
17:01:29 RQ worker started, version 0.4.6
17:01:29
17:01:29 *** Listening on default...
现在我们需要更新我们的app.py来向queue(队列)发送任务。
更新app.py
将下面一些模块import到app.py
from rq import Queue
from rq.job import Job
from worker import conn
然后更新配置部分:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)设置一个Redis连接并基于连接初始化队列。
现在我们将文本处理功能 搬出我们的索引路径并且添加一个叫做count_and_save_words()的函数。当我们从索引路径调用这个函数时需要传入一个URL参数让它接收。
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# get url that the person has entered
url = request.form['url']
if 'http://' not in url[:7]:
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
注意下面的代码:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
在这里我们使用我们之前初始化的queue(队列)和命名为enqueue_call() 的函数。这里添加了一个运行将URL作为参数的count_and_save_words() 函数的新job到我们的queue中。result_ttl=5000 这一行的参数告知RQ为job结果等待多久——5000秒。然后我们输出job的id到终端。这个是必要的,用来查看这个job是否处理完成。
让我们来设置一条新的route(路径)
新的Route(路径)
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
让我们来测试这一点。
启动服务,进入http://localhost:5000/,使用URL http://realpython.com,然后从终端抓取这个job的id。接着使用这个id在 ‘/results/’末端。
只要例如:http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197
只要不超过5000秒前检查运行状态,你就可以看到id数值,这是我们将结果添加到数据库时产生的。
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
现在,让我们稍微重构route(路径)来用JSON将数据库的结果集返回。
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
确保import这个模块:
from flask import jsonify
再次测试这个点。如果一切顺利,你将在浏览器看到以下结果:
{
Course: 5,
Download: 4,
Python: 19,
Real: 11,
courses: 7,
development: 7,
return: 4,
sample: 4,
videos: 5,
web: 12
}
接下来是什么呢?
为了能够让所有整合到一起,我们将会添加AngularJS进行扩展,在下一个部分,创建一个基本轮询方式的服务端——每五秒发送一个请求或者直接在/results/<job_key> 末端请求更新。一旦数据是可用的,我们可以添加到DOM。
英文原文:https://realpython.com/blog/python/flask-by-example-implementing-a-redis-task-queue/
译者:Arvin
以上是关于Flask例子-实现Redis Task Queue(任务队列)的主要内容,如果未能解决你的问题,请参考以下文章