工作中rabbitmq的使用

Posted 还是那个徐东强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了工作中rabbitmq的使用相关的知识,希望对你有一定的参考价值。

拾人牙慧,将工作中rabbitmq那块的核心代码的枝干抽出,解析一下,主要涉及三个部分

1.生产消费普通模型

将消息发送到exchange中,exchange一边接收消息,一边负责将消息路由到指定的queue中 ,消费端消费

根据(消息发送时候过来的routingkey以及queue和exchange的bindingkey是否匹配)

2.延迟队列的实现 

当消息被发送以后,并不想让消费者立即拿到消息,而是等指定时间后,消费者才拿到这个消息进行消费。

3.服务发布双云,有时会有这样的需求,一个服务某方法执行的时要求另一个云也执行

场景1:非同步缓存的清理。当腾讯云缓存的内容发生改变时,除了清理腾讯云的缓存,还需要发布一个消息,指定清理aws云的缓存,否则轮询会有结果不一致的情况。

场景2:由于某些触发条件导致zk值改变,需要同步修改另一个云的zk的值,否则轮询会出现返回的结果不一致的情况。比如开关存在zk上,轮询到一个云,获取的结果是关闭,但是验证的时候,轮到到另一个云,此时结果是打开,那么就怎么样也无法通过校验了。


首先是代码,抽出了枝干,去掉敏感信息截图

spring的配置文件如下:

 生产端:(ProdudceTemplate代码如下)

工作中rabbitmq的使用

如何使用?只需要通过@Autowired引入ProduceTemplate对象,调用对应的send方法即可。如果希望消息延迟下消费,则指定延迟的时间(单位:分钟)

send方法做的事情比较简单,调用spring提供的amqpTemplate.send,参数包括了routingkey,消息body,消息的properties

 

消费端:(代码如下)

1.定义MessageConsumer接口,其中方法 指定消息的routingkey 以及如何处理消息的handleMessage

工作中rabbitmq的使用

各服务中只需要 实现MessageConsumer接口,指定routingkey,重写handleMessage。并且加入到ApplicationContext中即可轻松通过mq实现生产消费。


为什么能够做到?

那是因为定义了一个消息消费工厂类,实现ApplicationContextAware

通过 applicationContext.getBeansOfType可以拿到  项目中用到的所有实现了MessageConsumer接口的实现类,其中key=在applicationContext中的id,value=具体的实例

工作中rabbitmq的使用

遍历获取的MessageConsumer接口的实现类

工作中rabbitmq的使用

MessageListenerAdapter:消息监听适配器,消息监听器一共有三种:

MessageListener、SessionAwareMessageListener和MessageListenerAdapter

  • MessageListener是最原始的消息监听器,定义了一个onMessage方法,接收一个message参数

  • SessionAwareMessageListener是spring为我们提供的,不是标准的JMS的MessageListener,使用的场景,当我们接收到一个消息时,需要回复一个消息的场合使用。它提供了onMessage处理消息方法,接收一个message参数,以及session参数,sesion参数可以发送消息

  • MessageListenerAdapter类,主要把接收到的消息进行一个类型转换,然后利用反射把他交个真正的消息处理器。如果它真正的消息处理器是上面的二者任一,则直接调用onMessage方法,而不会去反射调用。

当我们自定义消息处理器的时候,可以在MessageListenerAdapter中指定两个参数

delegate:我们自定义的真正的消息处理器

defaultListenerMethod: 消息处理其中处理消息的方法,不指定默认是handleMessage

工作中rabbitmq的使用

SimpleMessageListenerContainer: 有了messageListener消息监听器,我们还需要定义消息监听适配器对应的监听容器。

指定以下的参数:

指定哪台rabbitmq主机? setConnectionFactory

指定消息转换器? setMessageConverter

指定消费端消息的并发能力? setConcurrentConsumers setPrefetchCount

指定消息监听器 setMessageListener(自定义的消息处理器)

指定监听哪个队列? setQueueNames


下面以服务A中发布一个消息demoMessage,服务B中消费消息画图

工作中rabbitmq的使用

1.不管是服务A还是服务B都指定 connectionFactory 指向同一个

2.服务A 直接通过@Autowired注入ProduceTemplate,掉用send方法,指定routingkey=mian.test, message=demoMessage对象

3.服务B 消费者实现接口MessageConsumer,通过getMessageTopic指定routingkey=mian.test,实现handleMessage方法=处理消息。下面是消息监听的queue的由来

工作中rabbitmq的使用

生产端,发送消息时指定的routingkey=mian.test。

消费端,queue的名称通过代码指定为test:服务B:mian.test,并且将名为amp.topic的exchange和queue通过bindingkey=mian.test绑定起来。

这样一条消息到了amp.topic会被路由到图中的queue1中,从而被服务B的消费者消费。


延迟队列

生产端:(代码如下)

工作中rabbitmq的使用

消费端:(代码如下)

1.定义DelayMessageConsumer接口,其中有三个方法,指定消息的routingkey,消息对象,延迟时间

工作中rabbitmq的使用

消息消费工厂类的实现

略有不同,消息监听的并非生产端exchage路由到的queue,生产端将消息路由到一个dead letter queue。

这个dead letter queue有啥特点呢?dead letter queue中的消息一旦过了指定的时间没有被消费,则成为dead letter,然后会转发到其他的exchange。

  • rabbitmq可以针对queue和message设置x-message-ttl来控制消息的生存时间,如果超时,则消息变成dead letter

  • rabbitmq的queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选),如果队列中出现dead letter,则按照这两个参数重新路由

x-dead-letter-exchange: 出现dead letter之后将dead letter发送到指定的exchange

x-dead-letter-routing-key: 指定routing-key发送


下面以延时2分钟,routingkey为mian.test的消息为例,画图


由上面的发送代码可以看出来,消息的routingkey=test_delay.2m.mian.test

消费者工厂类中定义了上面的两个队列并且指定了bindingkey, 消费端消息监听queue2中,当生产端一条消息到amp.topic的exchange中的时候,会被路由到queue1中,但是由于该队列没有消费者监听,所以一旦过了两分钟,该队列中的消息会成为dead letter,根据x-dead-letter-exchange会被转发到test.expire的exchang中,消息的routingkey=test_delay.2m.mian.test,根据bindingkey=test.delay.*.mian.test,消息会被路由到queue2中,从而会被服务B的消费者消费


指定被另一个云消费

之前没理解过来,同一个服务中相对于同一个routingkey,既有生产者代码,也有消费者代码,发送的消息怎么就能保证不被自己云的消费者消费掉呢。

问题的关键是rabbitmq也是双云的,如果下面的rabbitmq是同一个,那么queue就是同一个,那么消息可能被监听在其上的任意一个消费者消费掉。

但是由于rabbitmq也是双云,aws上的queue1仅仅被腾讯云上面的消费者监听着,腾讯云执行清理不同步缓存也好,执行写zk也好,自然执行的是腾讯云上的redis以及zk。



以上是关于工作中rabbitmq的使用的主要内容,如果未能解决你的问题,请参考以下文章

深入了解RabbitMQ工作原理及简单使用

深入解读RabbitMQ工作原理及简单使用

[奇思异想]使用RabbitMQ实现定时任务

RabbitMQ入门教程——工作队列

golang使用rabbitmq工作队列

golang使用rabbitmq工作队列