利用Redis作消息队列,实现生产消费和发布订阅

Posted YidaHu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用Redis作消息队列,实现生产消费和发布订阅相关的知识,希望对你有一定的参考价值。

在工作中,我们经常会使用队列,在Python中也有原生队列,但是原生的队列是存储在内存中,当重启系统后队列中的数据就会丢失,无法进行分布式。

消息队列最常被使用的三种场景:异步处理、流量控制和服务解耦。当然,消息队列的适用范围不仅仅局限于这些场景,还有包括:

  • 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;
  • 连接流计算任务和数据;
  • 用于将消息广播给大量接收者。

简单的说,我们在单体应用里面需要用队列解决的问题,在分布式系统中大多都可以用消息队列来解决。

Redis可以作为简单的消息队列来用,但是它毕竟不是专业的消息队列,如果对于有很大的消息队列需求的系统还是考虑使用专业的MQ,在这里只讲解一下利用Redis做简单的消息队列,实现生产消费和发布订阅的场景。

生产消费模型

生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。

应用场景

在任务的处理时间比较长的情况下:

比如上传文件并处理,那么这个时候可以将用户上传和处理文件分成两个过程,用一个队列暂时存储用户上传文件的消息,然后立刻返回用户上传成功,然后有专门的线程处理队列中的文件。

在比如双十一的时候,会产生大量的订单,那么不可能同时处理那么多的订单,需要将订单放入一个队列里面,然后由专门的线程处理订单。当然这里需要更加专业的MQ去做了。

代码示例

生产者

import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'


@app.route('/producer')
def producer():
    elem = random.randrange(10)
    rcon.lpush(prodcons_queue, elem)
    logging.info("lpush {} -- {}".format(prodcons_queue, elem))
    return redirect('/')


if __name__ == '__main__':
    app.run(debug=True)

消费者

import redis


class Task(object):
    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.queue = 'task:prodcons:queue'

    def consumer(self):
        while True:
            task = self.rcon.blpop(self.queue, 0)[1]
            print("Task get", task)


if __name__ == '__main__':
    print('listen task queue')
    Task().consumer()

发布订阅模型

发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。

应用场景

  1. 应用程序需要向大量消费者广播信息。例如微信订阅号就是一个消费者量庞大的广播平台。
  2. 应用程序需要与一个或多个独立开发的应用程序或服务通信,这些应用程序或服务可能使用不同的平台、编程语言和通信协议。

代码示例

发布

import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
pubsub_channel = 'task:pubsub:channel'


@app.route('/pubsub')
def pubsub():
    ps = rcon.pubsub()
    ps.subscribe(pubsub_channel)
    elem = random.randrange(10)
    rcon.publish(pubsub_channel, elem)
    return redirect('/')


if __name__ == '__main__':
    app.run(debug=True)

订阅

import redis


class Task(object):

    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.ps = self.rcon.pubsub()
        self.ps.subscribe('task:pubsub:channel')

    def listen_task(self):
        for i in self.ps.listen():
            if i['type'] == 'message':
                print("Task get", i['data'])


if __name__ == '__main__':
    print('listen task channel')
    Task().listen_task()

总结

Redis可以作为简单的消息队列来用,但是它毕竟不是专业的消息队列,如果对于有很大的消息队列需求的系统还是考虑使用专业的MQ等。



好文章,请在看❤️


以上是关于利用Redis作消息队列,实现生产消费和发布订阅的主要内容,如果未能解决你的问题,请参考以下文章

并发9借助redis 实现生产消费,消息订阅发布模式队列

redis消息队列有没有

Redis实现消息队列之发布订阅模式

Redis消息队列

消息队列中点对点与发布订阅区别

利用redis简单实现消息订阅和发布