RabbitMQ基础

Posted toov5

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ基础相关的知识,希望对你有一定的参考价值。

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
RabbitMQ是由RabbitMQ Technologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市。
RabbitMQ的官网是http://www.rabbitmq.com
百度百科amqp协议介绍https://baike.baidu.com/item/AMQP/8354716?fr=aladdin
注意:RabbitMQ是采用erlang语言开发的,所以必须有Erlang环境才可以运行

Erlang  (高并发应用) 

Erlang编程语言最初目的是进行大型电信交换设备的软件开发,是一种适用于大规模并行处理环境的高可靠性编程语言。随着多核处理器技术的日渐普及,以及互联网、云计算等技术的发展,该语言的应用范围也有逐渐扩大之势。

百度百科介绍:https://baike.baidu.com/item/Erlang%E8%AF%AD%E8%A8%80/20864044?fr=aladdin

 

比较图示:  初衷理念实现抗高并发语言

技术图片

 

 

 

不同的项目 不同的 路径,独立的virtualhost,相互进行隔离:  客户端连接时候需要指定virtual host地址 。

更加解耦 相互进行隔离  类似于每个项目都有不同的数据库一样。

技术图片

 

 

 

添加virtual host

技术图片

指定某个用户的 Virtual Host

 技术图片

技术图片

 

AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制;

涉及概念解释: 
Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程;
Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue。   
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
Message Queue:消息队列,用于存储还未被消费者消费的消息;
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。

 

RabbitMQ几种工作模式:

  RabbitMQ点对点模式:

1、点对点模式  一对一模式。  一个生产者投递消息给队列 只能允许有一个消费者进行消费    如果集群的话 会进行均摊消费   服务器配置不一样 均摊就不优了 

     技术图片

长连接 不用三次握手之类的 提高传输效率     但是长连接占服务器带宽

推:  消费者已经启动了,建立长连接,一旦生产者向队列投递消息会立马推送给消费者

取: 生产者先投递消息队列进行缓存,这时候消费者再次启动时候 ,就会向队列获取消息。

点对点模式代码: RabbitMQ整合Spring Booot点对点模式

 

RabbitMQ手动模式自动应答模式

   1.了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
   2. 如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
   3. 没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
   4. 消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概 念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。

 

应答模式:

自动应答: 不在乎消费者对消息处理是否成功,都会告诉队列删除消息。如果处理消息失败,实现自动补偿(队列投递过去 重新处理)。

手动应答: 消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。

 

实现思路:

  • 生产者端代码不变,消费者端代码这部分就是用于开启手动应答模式的。

               channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
         注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。

  • 在处理完消息时,返回应答状态,true表示为自动应答模式。

              channel.basicAck(envelope.getDeliveryTag(), false);

 

自动应答: 不在乎消费者对消息处理是否成功,都会告诉队列删除消息。如果处理消息失败,实现自动补偿(队列投递过去 重新处理)。

手动应答: 消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。

RabbitMQ整合Spring Booot【应答模式】

 

RabbitMQ队列形式

  公平队列

  •      目前消息转发机制是平均分配,这样就会出现俩个消费者,奇数的任务很耗时,偶数的任何工作量很小,造成的原因就是近当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。
  •      为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount= 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。

 总结:

      只有在 消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完毕并自己对刚刚处理的消息进行确认之后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去清闲。

 

 公平队列原理:队列服务器向消费者发送消息的时候,消费者采用手动应答模式,队列服务器必须要收到消费者发送ack结果通知,才会发送下一个消息。(快的处理的多,消费的多)

 

使用背景:

  • 服务器能力不同,能者多劳。 均摊模式的话,都处理相同数量的
  •  消息队列 发出去的消息被消费完了 然后收到 ack包 才可以继续发给他

 

技术图片

 

 

 

 

通过 设置channel.basicQos(1); 开发:   RabbitMQ整合Spring Booot【公平队列】

 

 

Exchange

 这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。

 

 关于交换机:

  1. 生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习nginx有点类似。交换机的作用根据具体的路由策略分发到不同的队列中。

 

交换机有四种类型:

  1. Direct exchange(直连交换机):是根据消息携带的路由键(routing key)将消息投递给对应队列的。
  2. Fanout exchange(扇型交换机):将消息路由给绑定到它身上的所有队列。
  3. Topic exchange(主题交换机):队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。
  4. Headers exchange(头交换机):类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。 

 

过程:

一个生产者发送消息  ----> 到交换机  ---->  到队列(每个队列绑定到交换机上)  ----> 到消费者(每个消费者有自己的队列)

 

功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息), 包括:

  •    一个生产者
  •   一个交换机
  •   多个队列
  •   多个消费者

   技术图片

Direct模式:

场景

  • 生产者发送消息到交换机并指定一个路由key,
  • 消费者队列绑定到交换机时要指定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

例如

     我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。

注: 

    前面做的demo中RoutingKey设置的空。

 技术图片


代码实现:

RoutingKey有值的时候,那么 经过消息队列之后,需要在经过RoutingKey进行判断决定消费者。 

 

 

 

 

 

 

 

以上是关于RabbitMQ基础的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例

RabbitMQ基础理解

RabbitMQ-从基础到实战— Hello RabbitMQ

RabbitMQ—基础客户端开发

RabbitMQ基础教程之基本使用篇

RabbitMQ基础总结