消息队列rabbitmq

Posted bubu99

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列rabbitmq相关的知识,希望对你有一定的参考价值。

一、什么是消息队列(MQ)

MQ全称为Message Queue 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。这样发布者和使用者都不用知道对方的存在。

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

我们先不管消息(Message)这个词,来看看队列(Queue)。这一看,队列大家应该都熟悉吧。

队列是一种先进先出的数据结构。

技术图片

消息队列可以简单理解为:把要传输的数据放在队列中。

场景

在程序系统中,例如外卖系统,订单系统,库存系统,优先级较高
发红包,发邮件,发短信,app消息推送等任务优先级很低,很适合交给消息队列去处理,以便于程序系统更快的处理其他请求。

消息队列工作流程

消息队列一般有三个角色:
队列服务端
队列生产者
队列消费者
消息队列工作流程就如同一个流水线,有产品加工,一个输送带,一个打包产品
输送带就是 不停运转的消息队列服务端
加工产品的就是 队列生产者
在传输带结尾打包产品的 就是队列消费者

队列产品

RabbitMQ
Erlang编写的消息队列产品,企业级消息队列软件,支持消息负载均衡,数据持久化等。

ZeroMQ 
saltstack软件使用此消息,速度最快。

Redis
key-value的系统,也支持队列数据结构,轻量级消息队列

Kafka
由Scala编写,目标是为处理实时数据提供一个统一、高通量、低等待的平台

一个app系统消息队列工作流程

消费者,一个后台进程,不断的去检测消息队列中是否有消息,有消息就取走,开启新线程去处理业务,如果没有一会再来

技术图片

二、为什么要用消息队列?

1)程序解耦

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2)冗余:

消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。

许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

3)峰值处理能力:

(大白话,就是本来公司业务只需要5台机器,但是临时的秒杀活动,5台机器肯定受不了这个压力,我们又不可能将整体服务器架构提升到10台,那在秒杀活动后,机器不就浪费了吗?因此引入消息队列)

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。

如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。

使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

4)可恢复性:

系统的一部分组件失效时,不会影响到整个系统。

消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

5)顺序保证:

在大多使用场景下,数据处理的顺序都很重要。

大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性)

6)缓冲:

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

7)异步通信:

很多时候,用户不想也不需要立即处理消息。比如发红包,发短信等流程。

消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

接下来利用一个外卖系统的消息推送给大家解释下MQ的意义。

技术图片

三、什么情况下会用消息队列?

1、电商订单

想必同学们都点过外卖,点击下单后的业务逻辑可能包括:检查库存、生成单据、发红包、短信通知等,如果这些业务同步执行,完成下单率会非常低,如发红包,短信通知等不必要的流程,异步执行即可

此时使用MQ,可以在核心流程(扣减库存、生成订单记录)等完成后发送消息到MQ,快速结束本次流程。消费者拉取MQ消息时,发现红包、短信等消息时,再进行处理

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口

技术图片

这种做法有一个缺点:

  • 当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱钱。。。。)

  • 订单系统和库存系统高耦合.

引入消息队列

技术图片

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统:订阅下单的消息,获取下单消息,进行库操作 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了,钞票快快的来呀~~).

2、秒杀活动

流量削峰一般在秒杀活动中应用广泛 场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。 作用: 1.可以控制活动人数,超过此一定阀值的订单直接丢弃(怪不得我一次秒杀都没抢到过。。。。。wtf???)

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

技术图片

3.用户的请求,服务器接收到之后,写入消息队列,超过定义的阈值就直接丢弃请求,或者跳转错误页面

4.业务系统取出队列中的消息,再做后续处理。

四、RabbitMQ

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

rabbitmq官方推荐的python客户端pika模块 pip3 install pika

rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

中文文档

1、rabbitmq的安装

rabbitmq-server服务端

1.下载centos源
wget -O /etc/yum.repos.d/CentOS-Base.repo   http://mirrors.cloud.tencent.com/repo/centos7_base.repo
2.下载epel源
wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo
3.清空yum缓存并且生成新的yum缓存
yum clean all
yum makecache
4.安装erlang
   $ yum -y install erlang
5.安装RabbitMQ
   $ yum -y install rabbitmq-server
6.启动(无用户名密码):
    systemctl start/stop/restart/status rabbitmq-server
?
设置rabbitmq账号密码,以及角色权限设置

# 设置新用户yugo 密码123
sudo rabbitmqctl add_user yugo 123
?
# 设置用户为administrator角色
sudo rabbitmqctl set_user_tags yugo administrator
?
# 设置权限,允许对所有的队列都有权限
对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
sudo rabbitmqctl set_permissions
-p "/" yugo ".*" ".*" ".*" ? #重启服务生效设置 service rabbitmq-server start/stop/restart rabbitmq相关命令 // 新建用户 rabbitmqctl add_user {用户名} {密码} ? // 设置权限 rabbitmqctl set_user_tags {用户名} {权限} ? // 查看用户列表 rabbitmqctl list_users ? // 为用户授权 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost> ? // 删除用户 rabbitmqctl delete_user Username ? // 修改用户的密码 rabbitmqctl change_password Username Newpassword // 删除 Virtual Hosts : rabbitmqctl delete_vhost <vhost> // 添加 Users : rabbitmqctl add_user <username> <password> rabbitmqctl set_user_tags <username> <tag> ... rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 删除 Users : delete_user <username> ? // 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源 rabbitmqctl set_permissions -p vhost1 user1 .* .* .* ? // 查看权限 rabbitmqctl list_user_permissions user1 ? rabbitmqctl list_permissions -p vhost1 ? // 清除权限 rabbitmqctl clear_permissions [-p VHostPath] User ? //清空队列步骤 rabbitmqctl reset 需要提前关闭应用rabbitmqctl stop_app , 然后再清空队列,启动应用 rabbitmqctl start_app 此时查看队列rabbitmqctl list_queues ? 查看所有的exchange: rabbitmqctl list_exchanges 查看所有的queue: rabbitmqctl list_queues 查看所有的用户: rabbitmqctl list_users 查看所有的绑定(exchange和queue的绑定信息): rabbitmqctl list_bindings 查看消息确认信息: rabbitmqctl list_queues name messages_ready messages_unacknowledged 查看RabbitMQ状态,包括版本号等信息:rabbitmqctl status

#开启web界面rabbitmq
rabbitmq-plugins enable rabbitmq_management

#访问web界面
http://server-name:15672/

2、RabbitMQ组件解释

AMQP

AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件:

1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。


2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host


3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。


4.Message Queue:消息队列,用于存储还未被消费者消费的消息。


5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。


6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 


7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。


8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。


9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

3、rabbitMQ工作模型

简单模式(应答参数,持久化参数,分发参数)

# 生产者

import pika

# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(‘123.206.16.61‘))
# 有密码
credentials = pika.PlainCredentials("s14", "123")
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.119.10, credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
# 默认此队列不支持持久化,如果服务挂掉,数据丢失
# durable=True 开启持久化,必须新开启一个队列,原本的队列已经不支持持久化了
‘‘‘
实现rabbitmq持久化条件
 delivery_mode=2
使用durable=True声明queue是持久化

‘‘‘
channel.queue_declare(queue=LOL, durable=True)
channel.basic_publish(exchange=‘‘,
                      routing_key=LOL,  # 消息队列名称
                      body=德玛西亚万岁,
                      # 支持数据持久化
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 代表消息是持久的  2
                      )
                      )
connection.close()



# 消费者
import pika

# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(‘123.206.16.61‘))
# 有密码
credentials = pika.PlainCredentials("s14", "123")
connection = pika.BlockingConnection(pika.ConnectionParameters(192.168.119.10, credentials=credentials))
channel = connection.channel()
# 确保队列持久化
channel.queue_declare(queue=LOL, durable=True)

‘‘‘
必须确保给与服务端消息回复,代表我已经消费了数据,否则数据一直持久化,不会消失
‘‘‘


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body.decode("utf-8"))
    # 模拟代码报错
    # int(‘asdfasdf‘)    # 此处报错,没有给予回复,保证客户端挂掉,数据不丢失

    # 告诉服务端,我已经取走了数据,否则数据一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 关闭no_ack,代表给与回复确认
channel.basic_consume(callback, queue=LOL, no_ack=False)
channel.start_consuming()
技术图片
#######################简单模式#######################
### 生产者

import pika

# 1 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(127.0.0.1))
channel = connection.channel()

# 2 创建队列
channel.queue_declare(queue=hello)

# 3 向指定队列插入数据
channel.basic_publish(exchange=‘‘, # 简单模式
                      routing_key=hello, # 指定队列
                      body=Hello Yuan!)

print(" [x] Sent ‘Hello Yuan!‘")


### 消费者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel = connection.channel()

#  创建队列
channel.queue_declare(queue=hello)

# 确定回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 确定监听队列参数
channel.basic_consume(queue=hello,
                      auto_ack=True, # 默认应答
                      on_message_callback=callback)

print( [*] Waiting for messages. To exit press CTRL+C)

# 正式监听
channel.start_consuming()




#######################应答参数#######################
"""
### 消费者
auto_ack=False
ch.basic_ack(delivery_tag=method.delivery_tag)
"""
### 消费者
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel = connection.channel()

#  创建队列
channel.queue_declare(queue=hello)


# 确定回调函数
def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  #######2.#######

# 确定监听队列参数
channel.basic_consume(queue=hello,
                      auto_ack=False, ####### 1.默认应答改为手动应答
                      on_message_callback=callback)


print( [*] Waiting for messages. To exit press CTRL+C)
# 正式监听
channel.start_consuming()



#######################持久化参数#######################
"""
#声明queue
channel.queue_declare(queue=‘hello2‘, durable=True)  # 若声明过,则换一个名字
 
channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello2‘,
                      body=‘Hello World!‘,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                          )
                      )
"""
### 生产者

import pika

# 1 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(127.0.0.1))
channel = connection.channel()

# 2 创可持久化队列
channel.queue_declare(queue=hello3,durable=True)   ####1.若声明过,则换一个名字

# 3 向指定队列插入数据
channel.basic_publish(exchange=‘‘, # 简单模式
                      routing_key=hello3, # 指定队列
                      body=Hello ALEX!,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  #2 make message persistent
                      )
                      )

print(" [x] Sent ‘Hello ALEX!‘")

### 消费者
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel = connection.channel()

#  创建队列
channel.queue_declare(queue=hello3,durable=True)  #### 1.


# 确定回调函数
def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 确定监听队列参数
channel.basic_consume(queue=hello3,
                      auto_ack=False, # 默认应答改为手动应答
                      on_message_callback=callback)


print( [*] Waiting for messages. To exit press CTRL+C)
# 正式监听
channel.start_consuming()


#######################分发参数#######################
"""
channel.basic_qos(prefetch_count=1)

有两个消费者同时监听一个的队列。其中一个线程sleep2秒,另一个消费者线程sleep1秒,
但是处理的消息是一样多。这种方式叫轮询分发(round-robin)不管谁忙,都不会多给消息,
总是你一个我一个。想要做到公平分发(fair dispatch),必须关闭自动应答ack,改成手动应答。
使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。
"""

### 消费者

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
channel = connection.channel()

#  创建队列
channel.queue_declare(queue=hello4)


# 确定回调函数
def callback(ch, method, properties, body):
    import time
    time.sleep(5)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

#  公平分发
channel.basic_qos(prefetch_count=1)

# 确定监听队列参数
channel.basic_consume(queue=hello4,
                      auto_ack=False, # 默认应答改为手动应答
                      on_message_callback=callback)


print( [*] Waiting for messages. To exit press CTRL+C)
# 正式监听
channel.start_consuming()
简单模式

交换机模式(exchange)

rabbitmq发送消息首先是发给exchange,然后再通过exchange发送消息给队列(queue)

exchange有四种模式

fanout

exchange将消息发送给和该exchange连接的所有queue;也就是所谓的广播模式;此模式下忽略routing_key;

direct

路由模式,通过routing_key将消息发送给对应的queue; 如下面这句即可设置exchange为direct模式,只有routing_key为“black”时才将其发送到队列queue_name; channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=‘black‘)

技术图片

 

 

在上图中,Q1和Q2可以绑定同一个key,如绑定routing_key=‘KeySame’,那么收到routing_key为KeySame的消息时将会同时发送给Q1和Q2,退化为广播模式;

top

topic模式类似于direct模式,只是其中的routing_key变成了一个有“.”分隔的字符串,“.”将字符串分割成几个单词,每个单词代表一个条件;

headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

官方教程:http://www.rabbitmq.com/tutorials/tutorial-three-python.html

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

交换机之发布订阅

技术图片

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

交换机之关键字

技术图片

交换机之通配符

通配符交换机”与之前的路由模式相比,它将信息的传输类型的key更加细化,以“key1.key2.keyN....”的模式来指定信息传输的key的大类型和大类型下面的小类型,让消费者可以更加精细的确认自己想要获取的信息类型。而在消费者一段,不用精确的指定具体到哪一个大类型下的小类型的key,而是可以使用类似正则表达式(但与正则表达式规则完全不同)的通配符在指定一定范围或符合某一个字符串匹配规则的key,来获取想要的信息。

“通配符交换机”(Topic Exchange)将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号“#”匹配一个或多个词,符号“*”仅匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。(这里与我们一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)
下面是一个解释通配符模式交换机工作的一个样例
技术图片
上面的交换机制类似于一个国际新闻讯息网站的机制。

技术图片
#######################交换机之发布订阅#######################
###生产者
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel = connection.channel()


# 声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange=logs,
                         exchange_type=fanout) # fanout:发布订阅模式参数

# 向logs交换机插入数据"info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange=logs,
                      routing_key=‘‘,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()


###消费者
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel = connection.channel()

# 声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange=logs,
                         exchange_type=fanout)

# 创建队列 (随机生成一个队列)
result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 将指定队列绑定到交换机上
channel.queue_bind(exchange=logs,
                   queue=queue_name)

print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()


#######################交换机之关键字#######################
###生产者

import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel = connection.channel()


# 声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange=logs2,
                         exchange_type=direct) # fanout:发布订阅模式参数

# 向logs交换机插入数据"info: Hello World!"
message = "error: Hello World!"
channel.basic_publish(exchange=logs2,
                      routing_key=error,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()


###消费者
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel = connection.channel()


# 声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange=logs2,
                         exchange_type=direct)

# 创建队列(随机生成一个队列)
result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 将指定队列绑定到交换机上
channel.queue_bind(exchange=logs2,
                   queue=queue_name,
                   routing_key="info"
                   )

channel.queue_bind(exchange=logs2,
                   queue=queue_name,
                   routing_key="error"
                   )


print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()



#######################交换机之通配符#######################
###生产者
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()


# 声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange=logs3,
                         exchange_type=topic) # fanout:发布订阅模式参数

# 向logs交换机插入数据"info: Hello World!"
message = "usa.weather......."
channel.basic_publish(exchange=logs3,
                      routing_key=usa.weather,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()



###消费者
import pika

# 链接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel = connection.channel()

# 声明一个名为logs类型为fanout的交换机
channel.exchange_declare(exchange=logs3,
                         exchange_type=topic)

# 创建队列(随机生成一个队列)
result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 将指定队列绑定到交换机上
channel.queue_bind(exchange=logs3,
                   queue=queue_name,
                   routing_key="#.weather"
                   )


print( [*] Waiting for logs. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()
交换机

四、基于rabbitmq的RPC实现

将一个函数运行在远程计算机上并且等待获取那里的结果,这个称作远程过程调用(Remote Procedure Call)或者 RPC。

RPC是一个计算机通信协议。

比喻

将计算机服务运行理解为厨师做饭,厨师想做一个小葱拌豆腐,厨师需要洗小葱、切豆腐、调汁、凉拌。他一个人完成所有的事,如同古老的集中式应用,一台计算机做所有的事。
?
制作小葱拌豆腐{
  厨师>洗小葱>切豆腐>凉拌
}

rpc应用场景

而如今,饭店做大了,有钱了,专职分工来干活,不再是厨师单打独斗,备菜师傅准备小葱、豆腐,切菜师傅切小葱、豆腐,厨师只负责调味,完成食品。
?
制作小葱拌豆腐{
  备菜师>洗菜
  切菜师>切菜
  厨师>调味
}

此时一件事好多人在做,厨师就得和其他人沟通,通知备菜、洗菜师傅的这个动作就是远程过程调用(RPC)。

这个过程在计算机系统中,一个电商的下单过程,涉及物流、支付、库存、红包等多个系统,多个系统又在多个服务器上,由不同的技术团队负责,整个下单过程,需要所有团队进行远程调用。

下单{
  库存>减少库存
  支付>扣款
  红包>减免红包
  物流>生成订单
}

到底什么是rpc

rpc指的是在计算机A上的进程,调用另外一台计算机B的进程,A上的进程被挂起,B上的被调用进程开始执行后,产生返回值给A,A继续执行。
调用方可以通过参数将信息传递给被调用方,而后通过返回结果得到信息,这个过程对于开发人员来说是透明的。
?
如同厨师一样,服务员把菜单给后厨,厨师告诉洗菜人,备菜人,开始工作,完成工作后,整个过程对于服务员是透明的,他完全不用管后厨是怎么把菜做好的。

由于服务在不同的机器上,远程调用必经网络通信,调用服务必须写一坨网络通信代码,很容易出错且很复杂,因此就出现了RPC框架。

阿里巴巴的 Dubbo     java
新浪的 Motan java
谷歌的 gRPC 多语言
Apache thrift 多语言

rpc封装了数据的序列化,反序列化,以及传输协议

1、python实现RPC

利用RabbitMQ构建一个RPC系统,包含了客户端和RPC服务器,依旧使用pika模块

Callback queue 回调队列

一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to

Correlation id 关联标识

一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

客户端发送请求:某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息
?
服务器端工作流: 等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中
?
客户端接受处理结果: 客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用

过程
1.启动rpc客户端,等待接收数据到来,来了之后就进行处理,再将结果丢进队列
2.启动rpc服务端,发起请求

2、rpc的实现

技术图片

如图我们可以看出生产端client向消费端server请求处理数据,他会经历如下几次来完成交互。
  • 1.生产端 生成rpc_queue队列,这个队列负责帮消费者 接收数据并把消息发给消费端。
  • 2.生产端 生成另外一个随机队列,这个队列是发给消费端,消费这个用这个队列把处理好的数据发送给生产端。
  • 3.生产端 生成一组唯一字符串UUID,发送给消费者,消费者会把这串字符作为验证在发给生产者。
  • 4.当消费端处理完数据,发给生产端,时会把处理数据与UUID一起通过随机生产的队列发回给生产端。
  • 5.生产端,会使用while循环 不断检测是否有数据,并以这种形式来实现阻塞等待数据,来监听消费端。
  • 6.生产端获取数据调用回调函数,回调函数判断本机的UUID与消费端发回UID是否匹配,由于消费端,可能有多个,且处理时间不等所以需要判断,判断成功赋值数据,while循环就会捕获到,完成交互。
技术图片
import pika
import uuid
import time

# 斐波那契数列 前两个数相加依次排列
class FibonacciRpcClient(object):
    def __init__(self):
        # 赋值变量,一个循环值
        self.response = None
        # 客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应
        # 建立连接,指定服务器的ip地址
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=localhost))
        self.channel = self.connection.channel()

        # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次
        # exclusive=True 参数是指只对首次声明它的连接可见
        # exclusive=True 会在连接断开的时候,自动删除

        # 生成随机queue
        result = self.channel.queue_declare("",exclusive=True)
        # 随机取queue名字,发给消费端
        # 将次队列指定为当前客户端的回调队列
        self.callback_queue = result.method.queue

        # self.on_response 回调函数:只要收到消息就调用这个函数。
        # 声明收到消息后就 收queue=self.callback_queue内的消息

        # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理;
        self.channel.basic_consume(queue=self.callback_queue,
                                   auto_ack=True,
                              on_message_callback=self.on_response)

    # 对回调队列中的响应进行处理的函数

    # 收到消息就调用
    # ch 管道内存对象地址
    # method 消息发给哪个queue
    # body数据对象
    def on_response(self, ch, method, props, body):
        # 判断本机生成的ID 与 生产端发过来的ID是否相等
        if self.corr_id == props.correlation_id:
            # 将body值 赋值给self.response
            self.response = body

    # 发出RPC请求
    # 例如这里服务端就是一个切菜师傅,菜切好了,需要传递给洗菜师傅,这个过程是发送rpc请求

    def call(self, n):

        # 生成correlation_id 关联标识,通过python的uuid库,生成全局唯一标识ID,保证时间空间唯一性
        # 随机一次唯一的字符串
        self.corr_id = str(uuid.uuid4())

        # 发送RPC请求内容到RPC请求队列`s14rpc`,同时发送的还有`reply_to`和`correlation_id`
        # routing_key=‘rpc_queue‘ 发一个消息到rpc_queue内
        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=rpc_queue,
                                   properties=pika.BasicProperties(

                                         # 执行命令之后结果返回给self.callaback_queue这个队列中
                                         reply_to = self.callback_queue,
                                         # 生成UUID 发送给消费端
                                         correlation_id = self.corr_id,
                                         ),
                                   # 发的消息,必须传入字符串,不能传数字
                                   body=str(n))
        # 没有数据就循环收
        while self.response is None:
            # 非阻塞版的start_consuming()
            # 没有消息不阻塞
            self.connection.process_data_events()
            print("no msg...")
            time.sleep(0.5)
        return int(self.response)

# 实例化  建立客户端
fibonacci_rpc = FibonacciRpcClient()

# 发送RPC请求,丢进rpc队列,等待客户端处理完毕,给与响应
print("发送了请求sum(50)")
response = fibonacci_rpc.call(50)
print("得到远程结果响应%r" % response)
rpc_server.py
技术图片
import pika
import time
# 链接socket
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
# 建立会话
channel = connection.channel()

# 声明RPC请求队列
channel.queue_declare(queue=rpc_queue)

# 斐波那契数列
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

# 模拟一个进程,例如切菜师傅,等着洗菜师傅传递数据
def sum(n):
    n+=100
    return n
# 对RPC请求队列中的请求进行处理


# 收到消息就调用
# ch 管道内存对象地址
# method 消息发给哪个queue
# props 返回给消费的返回参数
# body数据对象
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    # 调用斐波那契函数 传入结果
    response = fib(n)
    
    # 调用数据处理方法
    # response = sum(n)

    ch.basic_publish(exchange=‘‘,
                     # reply_to代表回复目标
                     # 生产端随机生成的queue
                     routing_key=props.reply_to,
                     # 获取UUID唯一 字符串数值
                     # correlation_id(关联标识):用来将RPC的响应和请求关联起来。

                     properties=pika.BasicProperties(correlation_id =                                                    props.correlation_id),
                     # 消息返回给生产端
                     body=str(response))
    # 确保任务完成
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 负载均衡,同一时刻发送给该服务器的请求不超过一个
# channel.basic_qos(prefetch_count=1)

# rpc_queue收到消息:调用on_request回调函数
# queue=‘rpc_queue‘从rpc内收
channel.basic_consume(queue="rpc_queue",
                      auto_ack=True,
                      on_message_callback=on_request)
print(" [x] Awaiting RPC requests")

#开始消费
channel.start_consuming()
rpc_client.py

 

以上是关于消息队列rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

Rabbitmq 消息队列

微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)

RabbitMQ 消息队列学习

RabbitMQ确认机制问题处理