RabbitMQ

Posted aprilnn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ相关的知识,希望对你有一定的参考价值。

  • RabbitMQ就是消息队列(Message Queue)
  • 在Python中使用pika库和RabbitMQ相连,再通过RabbitMQ查看队列和消息
  • RabbitMQ消息发送采用轮询方式,即一个接收完再到下一个接收,最后回到第一个consumer,依次循环

一、简单生产者和消费者实现

1、生产者

import pika
#RabbitMQ 采用轮询方式
connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))#建立连接
channel=connection.channel()#声明一个管道
channel.queue_declare(queue=hello)#声明一个队列并命名

channel.basic_publish(exchange=‘‘,
                      routing_key=hello,#队列名
                      body=hello there~,
                      )
                      )#发送的消息
print(send message...)

connection.close()

  首先使用pika库建立连接,声明一个管道,再声明一个队列,发送消息,最后关闭连接

2、消费者

import pika
connection=pika.BlockingConnectison(pika.ConnectionParameters(localhost,5672))#RabbitMQ默认端口
channel=connection.channel()
channel.queue_declare(queue=hello,durable=True)#队列名,和produser的一致
def callback(ch,method,propotities,body):
    print("receive >>",body)
    ch.basic_ack(delivery_tag=method.delivery_tag)#no_ack=False的话要手动确认

channel.basic_consume(callback,#如果收到消息就调用回调函数
                queue=hello,
                # no_ack=True
                     )
print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()#是channel开始不是connection开始,不用关闭连接

 二、RabbitMQ交换模式

RabbitMQ交换模式有fanout、direct和topic

  注:三种交换模式中信息的发送都是即时的,即错过了消息就接收不到了

1、fanout广播

  • fanout即广播模式,也是发布订阅

publisher实现

  • fanout模式中publisher和一般publisher差不多,但不需要声明queue即routing_key为空,但要指明exchange_type
技术分享图片
import pika
#不需要声明queue,广播是实时的即订阅发布
connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel=connection.channel()
channel.exchange_declare(exchange=logs,exchange_type=fanout)
message=message...
channel.basic_publish(exchange=logs,
                      routing_key=‘‘,
                      body=message)

print(send...)
connection.close()
View Code

consumer实现

  • consumer中也不需要指明queue名,rabbitmq会随机分配,但要进行对分配的queue绑定的动作
  • consumer中的exchange_type要与publisher中的一致,否则接收不到消息
  • consumer可以接收publisher广播的任何消息
技术分享图片
import pika
#发送端和接收端均不用声明queue
connection=pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel=connection.channel()
channel.exchange_declare(exchange=logs,
                         exchange_type=fanout)

result=channel.queue_declare(exclusive=True)
#不指定queue名字,rabbitmq会随机指定queue,exclusive=True会在消息接收完后删除
queue_name=result.method.queue
def callback(ch,method,properties,body):
    print(receive>>,body)

channel.queue_bind(exchange=logs,queue=queue_name)#绑定queue
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()
View Code

 

2、direct过滤特定消息

  • direct模式可以进行简单过滤,客户端可以选择接收什么类型的消息,如info、warning、error等

publisher实现

  • direct模式与fanout不同的是direct中routing_key是必须的,即消息的级别
  • direct中要定义消息级别(通过命令行实现不同消息级别的定义),默认级别为info
  • 生产者消费者exchange名字要一致
技术分享图片
import pika,sys
#不需要声明queue,广播是实时的即订阅发布
connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel=connection.channel()
channel.exchange_declare(exchange=direct_logs,exchange_type=direct)
severity=sys.argv[1] if len(sys.argv)>1 else info#定义消息级别,默认为info  #sys.argv为获取命令行参数
message=‘‘.join(sys.argv[2:])or HELLO!#获取消息
channel.basic_publish(exchange=direct_logs,#exchange名字可以随便起,但消费端exchange名字一定要和生产端一致,否则收不到消息!!!
                      routing_key=severity,#direct模式中routing_key是必须的,生成和消费端都是
                      body=message)

print(send...)
connection.close()
View Code

技术分享图片

 

 

consumer实现

  • 消费者要在命令行模式下指明接收的消息级别(可以接收多个级别的消息),否则会报错
  • 指明routing_key即消息级别
技术分享图片
#direct可以实现接收特定消息或指定消息
import pika,sys
#发送端和接收端均不用声明queue
connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel=connection.channel()
channel.exchange_declare(exchange=direct_logs,
                         exchange_type=direct)

result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
#定义级别,不指定级别会报错
severities=sys.argv[1:]
if not severities:
    sys.stderr.write(Usage:%s [info] [warning] [error]%sys.argv[0])
    sys.exit(1)
for s in severities:
    channel.queue_bind(exchange=direct_logs, queue=queue_name,routing_key=s)  
    # 绑定queue,要指定routing_key!!
def callback(ch,method,properties,body):
    print(receive>>,body)

channel.basic_consume(callback,
                      queue=queue_name,
                      #no_ack=True
                      )
channel.start_consuming()
View Code

 技术分享图片

技术分享图片

3、topic细致的过滤

  • topic模式可以实现接收不同应用程序的所有消息,如mysql.info,*.info等

publisher实现

  • publisher只需要在direct的基础上更改exchange和exchange_type
  • 将默认消息级别改为‘anonymous.info‘
技术分享图片
import pika,sys
#不需要声明queue,广播是实时的即订阅发布
connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel=connection.channel()
channel.exchange_declare(exchange=topic_logs,exchange_type=topic)
severity=sys.argv[1] if len(sys.argv)>1 else anonymous.info#定义消息级别,默认为info  #sys.argv为获取命令行参数
message=‘‘.join(sys.argv[2:])or HELLO!#获取消息
channel.basic_publish(exchange=topic_logs,#exchange名字可以随便起,但消费端exchange名字一定要和生产端一致,否则收不到消息!!!
                      routing_key=severity,#direct模式中routing_key是必须的,生成和消费端都是
                      body=message)

print(send...)
connection.close()
View Code

技术分享图片

 

consumer实现

  • 消费端接收的消息类型可以以*.开头,如*.info,这样可以接收不同应用的消息
  • #表示接收所有消息
技术分享图片
#direct可以实现接收特定消息或指定消息
import pika,sys
#发送端和接收端均不用声明queue
connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel=connection.channel()
channel.exchange_declare(exchange=topic_logs,
                         exchange_type=topic)

result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
#定义级别,不指定级别会报错
severities=sys.argv[1:]
if not severities:
    sys.stderr.write(Usage:%s [指定接收消息的类型]%sys.argv[0])
    #消费端接收的消息类型可以以*.开头,如*.info,这样可以接收不同应用的消息
    sys.exit(1)
for s in severities:
    channel.queue_bind(exchange=topic_logs, queue=queue_name,routing_key=s)  # 绑定queue,要指定routing_key!!
def callback(ch,method,properties,body):
    print(receive>>,body)

channel.basic_consume(callback,
                      queue=queue_name,
                      #no_ack=True
                      )
channel.start_consuming()
View Code

 技术分享图片

技术分享图片

  • rpc
  • rpc可以实现客户-服务器进行双向通信,客户和服务器都将消息发送到rpc中,rpc用不同的队列支持它们通信
  • 客户端
技术分享图片
import pika
import uuid
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
        self.channel=self.connection.channel()
        result=self.channel.queue_declare(exclusive=True)
        self.callback_queue=result.method.queue
        self.channel.basic_consume(self.callback,no_ack=True,queue=self.callback_queue)

    def callback(self,ch,method,props,body):
        if self.corr_id==props.correlation_id:
            #判断从服务器端接收的UUID和之前发送的是否相等,借此判断是否是同一个消息队列
            self.response=body#从服务器端收到的消息

    def call(self,n):
        self.response=None
        self.corr_id=str(uuid.uuid4())#客户端发送请求时生成一个唯一的UUID

        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=rpc_queue,
                                   properties=pika.BasicProperties(#消息持久化
                                       reply_to=self.callback_queue,
                                   correlation_id=self.corr_id),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
            #一般情况下channel.start_consuming表示阻塞模式下接收消息
            #这里不阻塞收消息,且隔一段时间检查有没有消息
            #即非阻塞版的start_consuming
            print(no message...)#进行到这一步代表没有消息
        return int(self.response)



fibonacci_rpc=FibonacciRpcClient()
response=fibonacci_rpc.call(6)
print(get,response)
View Code

 

  • 服务器端
技术分享图片
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel=connection.channel()
channel.queue_declare(queue=rpc_queue)

def callback(ch,method,props,body):
    ‘‘‘接收消息并返回结果‘‘‘
    n=int(body)
    print(n)
    response=fib(n)
    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,#要返回的队列名
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),#使correlation_id等于客户端的并发送回去
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)#确认收到

def fib(n):
    ‘‘‘斐波那契实现‘‘‘
    if n==0 or n==1:
        return n
    else:
        return fib(n-1)+fib(n-2)



channel.basic_consume(callback,queue=rpc_queue)
channel.start_consuming()
View Code

 

以上是关于RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

带着新人学springboot的应用07(springboot+RabbitMQ 下)

RabbitMQ入门:Hello RabbitMQ 代码实例

rabbitmq演示代码

SpringBoot RabbitMQ 延迟队列代码实现

RabbitMQ代码第一步

RabbitMQ代码第一步