SpringBoot整合RabbitMQ,实现消息发送和消费
Posted rhyme
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ,实现消息发送和消费相关的知识,希望对你有一定的参考价值。
下载安装Erlang和RabbitMQ
Erlang和RabbitMQ:https://www.cnblogs.com/theRhyme/p/10069611.html
AMQP协议详解与RABBITMQ,MQ消息队列的应用场景,如何避免消息丢失
https://www.cnblogs.com/theRhyme/p/9578675.html
项目创建和依赖
推荐SpringCloud项目在线创建:https://start.spring.io/
不用上面这个也行,下面有代码和依赖;
gradle的依赖,和maven差不多:
buildscript { ext { springBootVersion = \'2.1.1.RELEASE\' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: \'java\' apply plugin: \'eclipse\' apply plugin: \'org.springframework.boot\' apply plugin: \'io.spring.dependency-management\' group = \'xy.study\' version = \'0.0.1-SNAPSHOT\' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { implementation(\'org.springframework.boot:spring-boot-starter-amqp\') implementation(\'org.projectlombok:lombok:1.16.+\') runtimeOnly(\'org.springframework.boot:spring-boot-devtools\') testImplementation(\'org.springframework.boot:spring-boot-starter-test\') }
代码
配置文件application.properties
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest server.port = 5678
RabbitMQ配置文件类(注释的代码可以直接删掉):
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列 * 首先对topic规则配置 */ //@Configuration public class TopicRabbitConfig { final public static String QUEUE_NAME = "queue.name"; final public static String TEST_TOPIC_ROUTINGKEY = "test.topic.routingKey"; final public static String TEST_EXCHANGE_HAHA = "test.exchange.haha"; /** * 设置交换器的名称 * @return *//* @Bean TopicExchange exchange() { return new TopicExchange(TopicRabbitConfig.TEST_EXCHANGE_HAHA); } *//** * 队列名称 * @return *//* @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.QUEUE_NAME); } *//** * 将指定routing key的名称绑定交换器的队列 * @param queueMessage * @param exchange * @return *//* @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY); }*/ /** * 匹配以topic开头的路由键routing key * 交换机绑定多个队列 */ /*@Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }*/ }
生产者,这里根据Exchange和Routing Key,直接发送一个字符串:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.TopicRabbitConfig; @Component @Slf4j public class HelloSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 通过exchange和routingKey的方式 * rabbitTemplate.convertAndSend(String exchange, String routingKey, Object object) * @param i */ public void send(int i) { String context = "hello " + i; log.info("Sender : {}", context); this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TEST_EXCHANGE_HAHA,TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY, context); } }
消费者,绑定对应的Exchange,Queue和Routing Key,直接打印获取的信息:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.TopicRabbitConfig; @Component @Slf4j public class HelloReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void onOrgDeleted(@Payload String msg) { log.info("HelloReceiver msg : {}",msg); } }
测试类,调用生产者发送信息的函数send,消费者会监听消费:
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import xy.study.rabbitmq.producer.HelloSender; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private HelloSender sender; @Test public void testSend() { sender.send(666); } }
如图,控制台日志,能生成消息,并且能被对应的消费者消费。
topic exchange 通配路由中多个消费者的情况
修改消费者的代码如下:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import xy.study.rabbitmq.conf.TopicRabbitConfig; @Component @Slf4j public class HelloReceiver { /** * 下面四个消费者,exchange和RoutingKey都相同,最后两个消费者队列名都相同 * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueName(@Payload String msg) { log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME,msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = TopicRabbitConfig.QUEUE_NAME+".test", durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueNameTest(@Payload String msg) { log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME+".test",msg); } /** * 这里我的消费者队列名"123445",是乱写的,也能够接受 * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = 123445+"", durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueNameNumber(@Payload String msg) { log.info("{}-----HelloReceiver msg : {}",123445+""+".test",msg); } /** * 由于这个和上面的Exchange、RoutingKey、queue完全相同,所以这两个消费者,一条消息,只有一个能消费(随机) * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = 123445+"", durable = "true"), exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC), key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY) ) public void queueNameNumberSame(@Payload String msg) { log.info("same+{}-----HelloReceiver msg : {}",123445+""+".test",msg); } }
再次执行测试,测试结果如下:
上面四个消费者代码,Exchange和RoutingKey都相同,最后两个消费者队列名都相同。
根据结果可知,当Exchange和RoutingKey相同、queue不同时,所有消费者都能消费同样的信息;
当Exchange和RoutingKey、queue都相同时(最后两个消费者),这时候就是round robin, 消费者中只有一个能消费信息,其他消费者都不能消费该信息。
以上是关于SpringBoot整合RabbitMQ,实现消息发送和消费的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot整合RabbitMQ,实现消息发送和消费
SpringBoot整合RabbitMQ--重试/消息序列化--方法/实例