RabbitMQ基本理论

Posted Jonathan1314

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ基本理论相关的知识,希望对你有一定的参考价值。

本节内容

一  RabbitMQ介绍

二  RabbitMQ安装配置

三  RabbitMQ的Python实现-pika

  1. 生产者消费者

  2. 工作队列 

  3. 持久化和公平分发

  4. 发布与订阅

  5. RPC

附:概念汇总

 

一  RabbitMQ介绍

 

 

1.  RabbitMQ使用场景

RabbitMQ是一个消息中间件,消息中间件【最主要的作用:信息的缓存区】从应用场景如下:

场景一:系统集成与分布式系统的设计

各个子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”

举个例子:现在医院有两个科“看病科”和“住院科”在之前他们之间是没有任何关系的,

如果你在“看病课”看完病后注册的信息和资料,

到住院科后还得重新注册一遍?那现在要改革,你看完病后可以直接去住院科那两个系统之间需要打通怎么办?

这里就可以使用我们的消息中间件了。

场景二:异步任务处理结果回调的设计

举个列子:记录日志,假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,

当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志信息,这样就不需要同步的写入日志了

场景三:并发请求的压力高可用设计

举个列子:比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。

如果能够将请求转发到消息队列,再由服务去消费这些消息将会使得请求变得平稳,提高系统的可用性。

 

2.  消息队列介绍

 

ActiveMQ

RabbitMQ Kafka
所属社区/公司

Apache

Mozilla Public License Apache/LinkedIn
开发语言

Java

Erlang Java
支持的协议

OpenWire、STOMP

REST、XMPP、AMQP

AMQP 仿AMQP
事务 支持

不支持

不支持

集群

支持

支持 支持
负载均衡 支持 支持

支持

动态扩容 不支持 不支持

支持(zk)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

常用的消息中间件来说,感觉Kafka更好些,没错Kafka是大数据时代诞生的消息中间件,但对于目前来说使用最广的还是RabbitMQ

 

Advanced Message Queuing Protocol(AMQP,高级消息队列协议)

是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议

AMQP定义了通过网络发送的字节流的数据格式,因此兼容性非常好,

任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以容易做到跨语言、跨平台

 

3.  RabbitMQ和一般的消息传输模式:队列模式&主题模式区别

队列模式:

一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了10个消息,两个接收者A、B,那就是A、B总共会收到10条消息,不重复

 

主题模式:

对于Topic模式,一个发布者发布消息,有两个接收者A、B来订阅,那么发布了10条消息,A、B各收到10条消息

 

RabbitMQ的模式:

生产者生产消息后不直接发送到队列中,而是发到一个交换空间:Exchange,

Exchange会根据Exchange类型和Routing Key来决定发到哪个队列中,详情见发布订阅

 

二  RabbitMQ安装配置

 

 

测试环境

[root@localhost ~]# cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)

1.  安装环境

# 安装配置epel源
yum install -y epel-release
# 安装erlang
yum -y install erlang
# 安装RabbitMQ
yum -y install rabbitmq-server

2.  用户配置

# 启动服务
service rabbitmq-server start
# 添加用户
rabbitmqctl add_user admin admin
# 添加管理员权限
rabbitmqctl set_user_tags admin administrator
# 修改密码 
abbitmqctl add_user admin youpassword
# 设置权限
rabbitmqctl  set_permissions  -p  \'/\'  admin \'.\' \'.\' \'.\'

3.  启动WEB管理

# 启动Web插件
rabbitmq-plugins enable rabbitmq_management
# 删除guest用户
rabbitmqctl delete_user guest
# 重启rabbitmq
service rabbitmq-server restart
# 配置文档
# vim /etc/rabbitmq/rabbitmq.config

4.  查看WEB后台

RabbitMQ默认启动的端口:5672
RabbitMQ默认的WEB端口:15672

http://localhost:15672

 

三  RabbitMQ的Python实现-pika

 

 

1.  生产者消费者

send.py

"""
生产者/发送消息方
"""
import pika

# 远程主机的RabbitMQ Server设置的用户名密码
credentials = pika.PlainCredentials(\'admin\', \'admin\')
connection = pika.BlockingConnection(pika.ConnectionParameters(\'192.168.254.143\', 5672, \'/\', credentials))

\'\'\'
ConnectionParameters 中的参数:virtual_host 注:
    相当于在rabbitmq层面又加了一层域名空间的限制,每个域名空间是独立的有自己的Echange/queues等
    举个好玩的例子Redis中的db0/1/2类似
\'\'\'

# 创建通道
channel = connection.channel()
# 声明队列hello,RabbitMQ的消息队列机制如果队列不存在那么数据将会被丢掉,下面我们声明一个队列如果不存在创建
channel.queue_declare(queue=\'hello\')

# 给队列中添加消息
channel.publish(exchange="",       # 默认时,向routing_key="hello",指定的队列发送消息
                routing_key="hello",
                body="Hello World")
print("向队列hello添加数据结束")
# 缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭通道
channel.close()

receive.py

"""
消费者/接收消息方
"""
import pika

# 远程主机的RabbitMQ Server设置的用户名密码
credentials = pika.PlainCredentials(\'admin\', \'admin\')
connection = pika.BlockingConnection(pika.ConnectionParameters(\'192.168.254.143\', 5672, \'/\', credentials))

# 创建通道
channel = connection.channel()
# 声明队列
channel.queue_declare(queue=\'hello\')
"""
你可能会问为什么我们还要声明队列呢? 我们在之前代码里就有了,但是前提是我们已经知道了我们已经声明了代码,
但是我们可能不太确定谁先启动~
所以如果你们100%确定也可以不用声明,但是在很多情况下生产者和消费者都是分离的.所以声明没有坏处
"""


# 订阅的回调函数这个订阅回调函数是由pika库来调用的
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 定义通道消费者参数
channel.basic_consume(callback,
                      queue=\'hello\',
                      no_ack=True)  # 相当于告诉发送者,我收到了

print(\' [*] Waiting for messages. To exit press CTRL+C\')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()

 

2. 工作队列  

举个例子:

写日志同步慢、或者请求量写日志机器扛不住怎么办?异步、并且分担日志记录压力到多台服务器上

工作队列背后主要思想是避免立即执行资源密集型任务时,需要等待其他任务完成。

所以可以把要执行的任务拆分,部分任务封装成任务消息发送到队列。

send.py

"""
生产者/发送方
"""
import pika

# 远程主机的RabbitMQ Server设置的用户名密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters(\'192.168.254.143\', 5672, \'/\', credentials))

# 创建通道
channel = connection.channel()
# 声明队列task_queue,RabbitMQ的消息队列机制如果队列不存在那么数据将会被丢掉,下面我们声明一个队列如果不存在创建
channel.queue_declare(queue=\'task_queue\')

# 在队列中添加消息
for i in range(100):
    message = \'%s Meassage \' % i or "Hello World!"
    # 发送消息
    channel.basic_publish(exchange=\'\',
                          routing_key=\'task_queue\',
                          body=message,)
    # 发送消息结束
    print(" [x] Sent %r" % message)

# 关闭通道
channel.close()

receive1.py

"""
消费者/接收方
"""
import time
import pika

# 认证信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.254.143", 5672, "/", credentials))
# 建立通道
channel = connection.channel()
# 创建队列
channel.queue_declare("task_queue")


# 订阅的回调函数这个订阅回调函数是由pika库来调用的
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b\'.\'))
    time.sleep(body.count(b\'.\'))
    print(" [x] Done")

# 定义通道消费者参数
channel.basic_consume(callback,
                      queue="task_queue",
                      no_ack=True)

print(\' [*] Waiting for messages. To exit press CTRL+C\')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()

receive2.py

"""
消费者/接收方
"""
import time
import pika

# 认证信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.254.143", 5672, "/", credentials))
# 建立通道
channel = connection.channel()
# 创建队列
channel.queue_declare("task_queue")


# 订阅的回调函数这个订阅回调函数是由pika库来调用的
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b\'.\'))
    time.sleep(body.count(b\'.\'))
    print(" [x] Done")

# 定义通道消费者参数
channel.basic_consume(callback,
                      queue="task_queue",
                      no_ack=True)

print(\' [*] Waiting for messages. To exit press CTRL+C\')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()

默认RabbitMQ按照顺序发送每一个消息,每个消费者会获得相同的数量消息,这种分发消息的方式称之为循环。

 

3.  持久化和公平分发

3.1  消息持久化

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完就宕机(或出现其他意外)的情况,这种情况下就可能导致消息丢失。

为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,

RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;

如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。

这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,

这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…

下面是因为我们在消费者端标记了ACK=True关闭了它们,如果你没有增加ACK=True或者没有回执就会出现这个问题

 

生产者需要在发送消息的时候标注属性为持久化

# 在队列中添加消息
for i in range(100):
    message = \'%s Meassage \'% i or "Hello World!"
    # 发送消息
    channel.basic_publish(exchange=\'\',
                          routing_key=\'task_queue\',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2, ))  # 标记属性消息为持久化消息需要客户端应答

    # 发送消息结束,并关闭通道
    print(" [x] Sent %r" % message)

消费者需要发送消息回执

# 订阅回调函数,这个订阅回调函数是由pika库来调用
def callback(ch, method, properties, body):
    """
    :param ch: 通道对象
    :param method: 消息方法
    :param properties: 消息属性
    :param body: 消息内容
    :return: None
    """
    print(" [x] Received %r" % (body,))
    time.sleep(2)
    print(" [x] Done")
    # 发送消息确认,确认交易标识符
    ch.basic_ack(delivery_tag=method.delivery_tag)

我们可以通过命令查看那些消费者没有回复ack确认

# Linux
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# Windows
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

3.2  队列持久化

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),

这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器

已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,

那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。

 这里我们需要修改下生产者和消费者设置RabbitMQ消息的持久化  **[生产者/消费者]都需要配置**

channel.queue_declare(queue=\'task_queue\', durable=True)  # 队列持久化

 

3.3  公平分发

默认情况下RabitMQ会把队列里面的消息立即发送到消费者,无论该消费者有多少消息没有应答,也就是说即使发现消费者来不及处理,

新的消费者加入进来也没有办法处理已经堆积的消息,因为那些消息已经被发送给老消费者了

 

在消费者中增加: channel.basic_qos(prefetch_count=1) 

prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,

则该consumer将block掉,直到有消息ack。 这样做的好处是,如果系统处于高峰期,消费者来不及处理,

消息会堆积在队列中,新启动的消费者可以马上从队列中取到消息开始工作。

工作过程如下:

1. 消费者1接收到消息后处理完毕发送了ack并接收新的消息并处理

2. 消费者2接收到消息后处理完毕发送了ack并接收新的消息并处理

3. 消费者3接收到消息后一直处于消息中并没有发送ack不在接收消息一直等到消费者3处理完毕后发送ACK后再接收新消息

 

4. 发布与订阅

前面我们学工作队列主要任务是把每个任务分配给一个worker[工作者],而把消息发给多个消费者(不同队列中),这个模式称之为“发布订阅”

举个例子我们将创建一个简单的日志系统,包含两个程序第一个是用来发送日志,第二个是用来接收日志,

接收日志的程序每一个副本都将收到消息,**这样我们可以一个接收器用来写入磁盘,一个接收器用来输入到日志**

Exchanges可用的类型很少:**direct, topic, headers, fanout**---4种,我们先看最后一种

 

4.1 Fanout模式

模式特点:

  • 可以理解为广播模式

  • 不需要routing key,消息发送时通过exchange binding进行路由,在这个模式下routing key失去作用

  • 这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定

  • 如果接收到消息的Exchange没有与任何Queue绑定,则消息会被抛弃

send.py

import pika

# 远程主机的RabbitMQ Server设置的用户名密码
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.254.144", 5672, "/", credentials))

# 创建通道
channel = connection.channel()

# 声明Exchanges
channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")
"""
这里可以看到我们建立连接后,就声明了Exchange,因为把消息发送到一个不存在的Exchange是不允许的,
如果没有消费者绑定这个,Exchange消息将会被丢弃这是可以的因为没有消费者
"""
# 添加消息
for i in range(100):
    message = \'Logs Number: %s\' % i
    channel.basic_publish(exchange="logs",  # 将消息发送至Exchange为logs绑定的队列中
                          routing_key="",
                          body=message,)
    print(" [x] Sent %r" % message)

# 关闭通道
connection.close()

receive.py

"""
1. 声明一个exchange,类型为fanout
2. 申明一个随机queue
3. 绑定exchang与queue
"""

# 连接RabbitMQ验证信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.254.144", 5672, "/", credentials))

# 建立通道
channel = connection.channel()

# Bindings Exchanges 接收消息
channel.exchange_declare(exchange=\'logs\',
                         exchange_type="fanout")

# 使用随机队列/并标注消费者断开连接后删除队列
# 不需要维护一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue


# Queue 与 Exchanges绑定
channel.queue_bind(exchange=\'logs\',
                   queue=queue_name)

print(\' [*] Waiting for logs. To exit press CTRL+C\')


# 定义回调函数这个回调函数将被pika库调用
def callback(ch, method, properties, body):
    print(" [x] %r" % body)

# 定义接收消息属性
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

# 开始接收消息
channel.start_consuming()

测试:现在可以把信息分开记录

日志存入文件

python3 receive.py > fanout_logs.txt

并且想再屏幕也输出,另启一个窗口

python3 receive.py

概念解释

binding

当我们创建了Exchanges和(QUEUE)队列后,我们需要告诉Exchange发送到们的Queue队列中,所需要需要把Exchange和队列(Queue)进行绑定