Rabbitmq

Posted willwillie

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq相关的知识,希望对你有一定的参考价值。

rabbitmq是一个消息通信的开源工具。本文自问自答,回答一些使用和理解上的难点。
提出大概10个左右的问题,

1.Connection和Channel的区别?

程序要使用rabbitmq,要先创建一个到rabbitmq的连接,指定ip,端口,vhost,验证用户和密码,以及相关的设置,比如timeout时间、心跳时间,以及最大的channel数目。

连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

那么channel是什么呢?

信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

channel已经和qos,ack,nack,messageCount,以及consumer count等概念关联到一起了。。

比如说declare一个队列,Actively declare a server-named or named queue using queue.declare AMQP method.

原理是用的APQP提供的方式。

2.默认的exchange和可编程的amqp?

比如说发送publish(对应的amqp的命令是basicPublish)一个消息,一般涉及到下面这些概念,参数有channel,exchange,routing-key,payload(包括headers和body),和opts(比如content-type和content-encoding等)

[^Channel channel ^String exchange ^String routing-key payload
:keys [^Boolean mandatory ^String content-type ^String ^String content-encoding ^Map headers
        ^Boolean persistent ^Integer priority ^String correlation-id ^String reply-to ^String expiration ^String message-id
        ^Date timestamp ^String type ^String user-id ^String app-id ^String cluster-id]
 :or mandatory false]

当content-type为”application/json”的时候,服务端如果支持json格式,那么数据的解析就会非常友好。

amqp主要是一些概念的理解:

  1. AMQP模型(AMQP Model):一个由关键实体和语义表示的逻辑框架,遵从AMQP规范的服务器必须提供这些实体和语义。为了实现本规范中定义的语义,客户端可以发送命令来控制AMQP服务器。
  2. 连接(Connection):一个网络连接,比如TCP/IP套接字连接。
  3. 会话(Session):端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
  4. 信道(Channel):多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
  5. 客户端(Client):AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
  6. 服务器(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程。也称为“消息代理”。
  7. 端点(Peer):AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
  8. 搭档(Partner):当描述两个端点之间的交互过程时,使用术语“搭档”来表示“另一个”端点的简记法。比如我们定义端点A和端点B,当它们进行通信时,端点B是端点A的搭档,端点A是端点B的搭档。
  9. 片段集(Assembly):段的有序集合,形成一个逻辑工作单元。
  10. 段(Segment):帧的有序集合,形成片段集中一个完整子单元。
  11. 帧(Frame):AMQP传输的一个原子单元。一个帧是一个段中的任意分片。
  12. 控制(Control):单向指令,AMQP规范假设这些指令的传输是不可靠的。
  13. 命令(Command):需要确认的指令,AMQP规范规定这些指令的传输是可靠的。
  14. 异常(Exception):在执行一个或者多个命令时可能发生的错误状态。
  15. 类(Class):一批用来描述某种特定功能的AMQP命令或者控制。
    Connection.class
    Channel createChannel() throws IOException;

        Channel createChannel(int var1) throws IOException;
    
        void close() throws IOException;
    
        void close(int var1, String var2) throws IOException;
    
        void close(int var1) throws IOException;
    
        void close(int var1, String var2, int var3) throws IOException;
    

    比如下面这些命令都放在Channel这个类里面:
    void basicQos(int var1) throws IOException;

        void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;
    
        void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;
    
        void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;
    
        DeclareOk exchangeDeclare(String var1, String var2) throws IOException;
    
        DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2) throws IOException;
    
        DeclareOk exchangeDeclare(String var1, String var2, boolean var3) throws IOException;
    
        DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3) throws IOException;
    

    还有AMQP.class

  16. 消息头(Header):描述消息数据属性的一种特殊段。
  17. 消息体(Body):包含应用程序数据的一种特殊段。消息体段对于服务器来说完全不透明——服务器不能查看或者修改消息体。
  18. 消息内容(Content):包含在消息体段中的的消息数据。
  19. 交换器(Exchange):服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  20. 交换器类型(Exchange Type):基于不同路由语义的交换器类。
  21. 消息队列(Message Queue):一个命名实体,用来保存消息直到发送给消费者。
  22. 绑定器(Binding):消息队列和交换器之间的关联。
  23. 绑定器关键字(Binding Key):绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
  24. 路由关键字(Routing Key):一个消息头,交换器可以用这个消息头决定如何路由某条消息。
  25. 持久存储(Durable):一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
  26. 临时存储(Transient):一种服务器资源,当服务器重启时,保存的消息数据会丢失。
  27. 持久化(Persistent):服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
  28. 非持久化(Non-Persistent):服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
  29. 消费者(Consumer):一个从消息队列中请求消息的客户端应用程序。
  30. 生产者(Producer):一个向交换器发布消息的客户端应用程序。
  31. 虚拟主机(Virtual Host):一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,可以选择一个虚拟主机。
    vhost其实是一个虚拟概念,如果client没有设置的话,默认为”/”,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host

3.ack的作用?

在rabbitmq里面,有一个ack参数用来设置,默认是ack的,也就是说消息代理需要保证收到ack才讲那个message从队列里面移除。。。 另外还有一个就是auto-ack,如果auto-ack设置为true,那么只要server将消息分发给消费者,那么队列就认为收到ack了,就将这个消息移除;如果auto-ack设置为false,那么server将消息分发给消费者,要等到消费者自己说处理好了,才将这个消息从队列中删除。队列中的消息都是先到先处理的。

auto-ack为true好比排队等待服务的时候,轮到你的时候,不管你处理好没好,都不管你了;

auto-ack为false好比排队等待服务的时候,轮到你的时候,如果给你的服务处理不ok的话,你还是排在队列里,能再次处理;auto-ack设置为false的时候,需要消费者自己调用命令通知消息代理已经ok了。

做一个实验:

(defn ack-unless-exception
  "Wrapper for delivery handlers which auto-acks messages.

   This differs from `:auto-ack true', which tells the broker to
   consider messages acked upon delivery. This explicitly acks, as
   long as the consumer function doesn't throw an exception."
  [f]
  (fn [^Channel channel :keys [delivery-tag] :as metadata body]
    (f channel metadata body)
    (.basicAck channel delivery-tag false)))

对于包装了上面这个函数的handler,如果函数正常处理的话,会调用basicAck这个命令。那么在auto-ack=false的过程中抛出异常会怎么样?

这个消息会一直存在队列中(好像给没有正常ack的消息分配了一段缓存一样)直到被正确的处理。其他进来的消息被正常消费的就正常小峰,如果后续还存在异常的也会放一直保留到队列中。所以consumer在异常处理的时候抛出异常也是正确的做法。

所以对于所有需要正确处理的消息,设置auto-ack=false是明智之举。

但是,auto-ack=false的设置会导致rabbit-mq中的其他没有到consumer中的消息被阻塞。此时consumer只有在重启后才会重新处理这些没有ack的数据。
要解决这个问题,其实可以设置auto-ack=true,并且捕获处理异常,然后选择将消息重新加入到消息队列中去。可以参考https://felipeelias.github.io/rabbitmq/2016/02/22/rabbitmq-exponential-backoff.html

4.各种交换模式的应用场景?

topic:

Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

在这种交换机模式下:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必须包含一个 星号(),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。

具体代码 coding 可以参考上述链接的代码,在@rabbitlistener中修改type = ExchangeTypes.TOPIC,并且在发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下:

rabbitTemplate.convertAndSend(“testTopicExchange”,”key1.a.c.key2”, abc + ” from RabbitMQ!”);

5.怎样自定义一个插件,用于exchange?
rabbitmq支持自定义的exchange插件,那么怎么写一个自己的exchange呢?

6.预取消息有什么作用?
实际使用RabbitMQ过程中,如果完全不配置QoS,这样Rabbit会尽可能快速地发送队列中的所有消息到client端。因为consumer在本地缓存所有的message,从而极有可能导致服务器内存不足影响其它进程的正常运行。所以我们需要通过设置Qos的prefetch count来控制consumer的流量。同时设置得当也会提高consumer的吞吐量。

prefetch

prefetch允许为每个consumer指定最大的unacked messages数目。简单来说就是用来指定一个consumer一次可以从Rabbit中获取多少条message并缓存在client中(RabbitMQ提供的各种语言的client library)。一旦缓冲区满了,Rabbit将会停止投递新的message到该consumer中直到它发出ack。

假设prefetch值设为10,共有两个consumer。意味着每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。

总的来说,consumer负责不断处理消息,不断ack,然后只要unacked数少于prefetch * consumer数目,broker就不断将消息投递过去。

7.rabitmq和rpc?
们介绍了如何使用工作队列(work queue)在多个工作者(woker)中间分发耗时的任务。
可是如果我们需要将一个函数运行在远程计算机上并且等待从那儿获取结果时,该怎么办呢?这就是另外的故事了。这种模式通常被称为远程过程调用(Remote Procedure Call)或者 RPC。

8.rabbitmq和camel
著名的EIP实现框架Camel最早起源于ActiveMQ内的一些基于消息的集成需求,然后逐渐发展成为一个ActiveMQ的子项目,最后这一块的功能越来越完善,就成为了Apache的顶级项目。

—更新中

以上是关于Rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

开启springcloud全家桶8:配置中心的好搭档 Spring Cloud Bus 消息总线

Golang操作Rabbitmq

Golang操作Rabbitmq

「官宣」摹客XJira,项目管理星搭档!

rabbitmq结构

rabbitmq框架分析