Rabbitmq 消息队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq 消息队列相关的知识,希望对你有一定的参考价值。

RabbitMQ 消息队列介绍

RabbitMQ是一种消息队列,与线程queue和进程QUEUE作用是一样的。
RabbitMQ是一个中间程序,可以实现不同进程之间的通信(比如python和Java之间,QQ和Word之间等);
普通情况下A进程与B进程之间通信,两者之间需要建立很多连接和单独写一些代码,但是使用RabbitMQ的话就可以实现帮助不同进程之间的数据通信。
A进程交给RabbitMQ,RabbitMQ在交给B,同样B交给RabbitMQ,RabbitMQ在交给A,RabbitMQ可以实现A与B进程之间的连接和信息转换。
使用RabbitMQ可以实现很多个独立进程之间的交互,所有其他独立进程都可以用RabbitMQ作为中间程序。

py 消息队列:
线程 queue(同一进程下线程之间进行交互)
进程 Queue(父子进程进行交互 或者 同属于同一进程下的多个子进程进行交互)

如果是两个完全独立的python程序,也是不能用上面两个queue进行交互的,或者和其他语言交互有哪些实现方式呢。
【Disk、Socket、其他中间件】这里中间件不仅可以支持两个程序之间交互,可以支持多个程序,可以维护好多个程序的队列。
虽然可以通过硬盘的方式实现多个独立进程交互,但是硬盘速度比较慢,而RabbitMQ则能够很好的、快速的帮助两个独立进程实现交互。

像这种公共的中间件有好多成熟的产品:
RabbitMQ
ZeroMQ
ActiveMQ
……

RabbitMQ:erlang语言 开发的。
Python中连接RabbitMQ的模块:pika 、Celery(分布式任务队列) 、haigha
可以维护很多的队列
其中pika是RabbitMQ常用的模块

RabbitMQ 教程官网:http://www.rabbitmq.com/getstarted.html

几个概念说明:

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

![image_1cfq381mj7341i1b4lnitk1h3t9.png-178.4kB][1]
RabbitMQ不像之前学的python Queue都在一个队列里实现交互,RabbitMQ有多个队列(图中红色部分代表队列),每个队列都可以将消息发给多个接收端(C是接收端,P是生产消息端)

RabbitMQ基本示例.

1、Rabbitmq 安装

Windos系统

pip install pika

ubuntu系统

install  rabbitmq-server  # 直接搞定

以下centos系统
1)Install Erlang

# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm

yum install erlang

2)Install RabbitMQ Server

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm

3)use RabbitMQ Server

chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start

技术图片
rabbitmq已经开启,等待传输

2、基本示例

发送端 producer


import pika

# 建立一个实例;相当于建立一个socket。
#通过ctrl+ConnectionParameters可以看到能传很多参数,如果远程还可以传用户名密码。
connection = pika.BlockingConnection(
    pika.ConnectionParameters(‘localhost‘,5672)  # 默认端口5672,可不写
    )
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明queue
channel.queue_declare(queue=‘hello‘)
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,  # queue名字,将消息发给hello这个queue
                      body=‘Hello World!‘)  # 消息内容
print(" [x] Sent ‘Hello World!‘")
connection.close()  # 发完消息后关闭队列 

执行结果:

[x] Sent ‘Hello World!‘

注意一定要开启rabbitmq,否则会报错

接收端 consumer

import pika
import time

# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               ‘localhost‘))
# 声明管道
channel = connection.channel()

# 为什么又声明了一个‘hello’队列?
# 如果这个queue确定已经声明了,可以不声明。但是你不知道是生产者还是消费者先运行,所以要声明两次。如果消费者没声明,且消费者先运行的话,就会报错。
# 生产者先声明queue,消费者不声明,但是消费者后运行就不会报错。
channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):  # 四个参数为标准格式
    print(ch, method, properties)  # 打印看一下是什么
    # ch是管道内存对象地址;method是内容相关信息  properties后面讲  body消息内容
    print(" [x] Received %r" % body)
    #time.sleep(15)
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  # 消费消息
        ‘hello‘,  # 你要从哪个队列里收消息 
        callback,  # 如果收到消息,就调用callback函数来处理消息  # 注意调用的函数以前模块是放在形参第一个位置的,后面修改到第二个位置了,如果放错位置会报错
        # no_ack=True  # 写的话,如果接收消息,机器宕机消息就丢了
        # 一般不写。宕机则生产者检测到发给其他消费者
        )

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()  # 开始消费消息(开始接收消息,一直收,如果没消息就卡主在这里了)

执行结果:
 [*] Waiting for messages. To exit press CTRL+C
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f715d76f128> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver([‘consumer_tag=ctag1.b728277178e746118699d5b4302a0314‘, ‘delivery_tag=1‘, ‘exchange=‘, ‘redelivered=False‘, ‘routing_key=hello‘])> <BasicProperties>
 [x] Received b‘Hello World!‘

收到了bytes格式的 Hello World!

技术图片
消费者(接收端)这边看到已经卡主了

技术图片
如果此时单独在运行一下生产者(发送端),直接可以从消费者看到新收到的消息


rabbitmq 消息分发轮询

技术图片
重新开启rabbitmq

技术图片
运行三个接收者(消费者)

技术图片
运行发送者,可以看到被第一个接收者给收到信息了

技术图片
第二次运行发送者,第二个接收者收到信息了

技术图片
第三次运行发送者,第三个接收者收到信息了

上面几次运行说明了,依次的将信息发送每一个接收者


接收端 consumer

import pika
import time

# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               ‘localhost‘))
# 声明管道
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # 正常回调函数(callback)执行完成就表示信息接收完成,如果在还没执行完成时就出现异常就表示信息没有正常接收,比如断网、断电等,会导致信息不能正常接收。
    # 下面sleep 60秒,在60秒之前就将该模块终止执行来模拟异常情况。
    time.sleep(60)  
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  
        ‘hello‘,  
        callback, 
        # no_ack=True 表示不管消息是否接收(处理)完成,都不会回复确认消息
        # 如果producer不关心 comsumer是否处理完,可以使用该参数
        # 但是一般都不会使用它
        # no_ack=True  

        )

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()  # 

技术图片
在centos中重新执行rabbitmq-server start来清空队列里的消息
然后在pycharm开启三个comsumer,在去运行等待接收消息
再去执行producer来发送消息,执行producer后,立即关闭第一个comsumer,这样消息就会因为第一个comsumer没接收成功跑到第二个comsumer去,以此类推。

技术图片
关闭第二个comsumer,第三个comsumer收到信息

技术图片
这张图是将三个comsumer同时都关闭了,这样三个comsumer都收不到消息,说明producer的消息没有被接收,此时再去开启第一个comsumer,这时第一个comsumer会将消息给接收过来。

我们将sleep注释掉,也是这种现象,这是因为comsumer并没有发送确认消息给producer


import pika
import time

# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               ‘localhost‘))
# 声明管道
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    time.sleep(30)  
    ch.basic_ack(delivery_tag = method.delivery_tag)   # 告诉生成者,消息处理完成

channel.basic_consume(  
        ‘hello‘,  
        callback, 
        # no_ack=True  

        )

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()  # 

此时的代码:当其中一个comsumer执行完成,并发送确认消息后再去中断,下一个comsumer就不会收到消息;反之,如果还没发送确认消息就中断了,那么消息就会被下一个comsumer接收到。

rabbitmq 消息持久化

如果producer端宕机,那么队列的数据也会消失;这样就需要让队列消息持久化

# durable=True 该代码只是将生成的队列持久化(不是消息),如果producer宕机,队列会存在,单消息会丢
# 要注意需要在producer端和 comsumer端都要 写durable=True
channel.queue_declare(queue=‘hello‘,durable=True) 
在centos重新开启 rabbitmq-server start

在producer端

将producer代码执行三次,将三个消息放入队列

import pika

。
connection = pika.BlockingConnection(
    pika.ConnectionParameters(‘localhost‘,5672)  
    )
channel = connection.channel()
channel.queue_declare(queue=‘hello‘,durable=True)

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,  
                      body=‘Hello World!‘,
                      # 下面的代码是让消息持久化
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent ‘Hello World!‘")
connection.close()  

将producer代码执行三次,将三个消息放入队列

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
               ‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘,durable=True)
def callback(ch, method, properties, body): 
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # time.sleep(30) #注释掉
    ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume( 
        ‘hello‘, 
        callback
        )

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()  # 

以上是关于Rabbitmq 消息队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

Rabbitmq 消息队列

微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)

RabbitMQ 消息队列学习

RabbitMQ确认机制问题处理