Python之路48-RabbitMQ

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python之路48-RabbitMQ相关的知识,希望对你有一定的参考价值。

安装pika模块

linux下安装

pip3.5 install pika


一个简单的消息队列例子

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue
channel.queue_declare(queue="test")

# RabbitMQ消息不能直接发送到队列,它总是需要经历一个交换
channel.basic_publish(exchange="", routing_key="test", body="Hello World!")
print("[x] sent ‘Hello World!‘")
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列
channel.queue_declare(queue="test")


def callback(ch, method, properties, body):
    print("[x] received %r" % body)

channel.basic_consume(callback, queue="test", no_ack=True)

print("[*] waiting for messages, to exit press CTRL+C")
channel.start_consuming()

这种方式,RabbitMQ会将消息依次发送给接收者,跟负载均衡差不多


上面那种情况是接收端没有回应的,如果没有回应,接收端只要从队列中取走消息,队列中就已经没有这个数据了,有时为了避免这种请求,要求接收端必须接收消息并执行后,可以让接收端发送一个回应,然后RabbitMQ再将这条消息删除

发送端没有更改

接收端

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列
channel.queue_declare(queue="test")


def callback(ch, method, properties, body):
    print("[x] received %r" % body)
    time.sleep(30)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# channel.basic_consume(callback, queue="test", no_ack=True)
channel.basic_consume(callback, queue="test")

print("[*] waiting for messages, to exit press CTRL+C")
channel.start_consuming()


RabbitMQ持久化,只修改发送端就可以

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue
# channel.queue_declare(queue="test")
channel.queue_declare(queue="test", durable=True)

# RabbitMQ消息不能直接发送到队列,它总是需要经历一个交换
# channel.basic_publish(exchange="",
#                       routing_key="test",
#                       body="Hello World!")
channel.basic_publish(exchange="",
                      routing_key="test",
                      body="Hello World!",
                      properties=pika.BasicProperties(delivery_mode=2))
print("[x] sent ‘Hello World!‘")
connection.close()


公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

这里只需要修改接收端

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列
channel.queue_declare(queue="test")


def callback(ch, method, properties, body):
    print("ch", ch)
    print("method", method)
    print("properties", properties)
    print("[x] received %r" % body)
    time.sleep(30)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue="test")

print("[*] waiting for messages, to exit press CTRL+C")
channel.start_consuming()


消息发布和订阅

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息


fanout: 所有bind到此exchange的queue都可以接收消息

direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

   表达式符号说明:#代表一个或多个字符,*代表任何字符

      例:#.a会匹配a.a,aa.a,aaa.a等

          *.a会匹配a.a,b.a,c.a等

     注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

headers: 通过headers 来决定把消息发给哪些queue


fanout模式

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="fanout")

message = "Hello World!"

channel.basic_publish(exchange="test",
                      routing_key="",
                      body=message)
print("[x] sent %r" % message)
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="fanout")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="test", queue=queue_name)

print("[*] waiting for messages, to exit press CTRL+C")


def callback(ch, method, properties, body):
    print("[x] received %r" % body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()


direct模式

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="direct")

message = "Hello World!"
severity = "test123"
channel.basic_publish(exchange="test",
                      routing_key="test123",
                      body=message)
print("[x] sent %r:%r" % (severity, message))
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severity = "test123"
channel.queue_bind(exchange="test", queue=queue_name, routing_key=severity)

print("[*] waiting for messages, to exit press CTRL+C")


def callback(ch, method, properties, body):
    print("[x] received %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()


topic模式

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="topic")

message = "Hello World!"
routing_key = "Hello"
channel.basic_publish(exchange="test",
                      routing_key=routing_key,
                      body=message)
print("[x] sent %r:%r" % (routing_key, message))
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="topic")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
routing_key = "Hello"
channel.queue_bind(exchange="test", queue=queue_name, routing_key=routing_key)

print("[*] waiting for messages, to exit press CTRL+C")


def callback(ch, method, properties, body):
    print("[x] received %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()


rpc

发送端

import pika
import uuid
 
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=‘localhost‘))
 
        self.channel = self.connection.channel()
 
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=‘rpc_queue‘,
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

接收端

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
 
channel = connection.channel()
 
channel.queue_declare(queue=‘rpc_queue‘)
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id =                                                          props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=‘rpc_queue‘)
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()


本文出自 “八英里” 博客,请务必保留此出处http://5921271.blog.51cto.com/5911271/1912393

以上是关于Python之路48-RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

《Python学习之路 -- Python基础之切片》

[原创]java WEB学习笔记61:Struts2学习之路--通用标签 property,uri,param,set,push,if-else,itertor,sort,date,a标签等(代码片段

python成长之路第三篇_正则表达式

python成长之路第三篇_正则表达式

机器学习之路: python 实践 word2vec 词向量技术

常用python日期日志获取内容循环的代码片段