Java RabbitMQ配置和使用,基于SpringBoot
Posted 深南大道
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java RabbitMQ配置和使用,基于SpringBoot相关的知识,希望对你有一定的参考价值。
package rabbitmq.demo; import com.rabbitmq.client.AMQP; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RunWith(SpringRunner.class) @SpringBootTest public class DemoApplicationTests { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法 @Autowired AmqpAdmin amqpAdmin; /** * 创建交换器 Exchange */ @Test public void createExchange() { // 创建Exchange 交换器 amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); System.out.println("创建Exchange完成"); // 创建Queue 队列,持久化 amqpAdmin.declareQueue(new Queue("amqpproduct", true)); System.out.println("创建Queue完成"); // 创建绑定规则 amqpAdmin.declareBinding(new Binding("amqpproduct", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqpproduct", null)); System.out.println("创建绑定规则完成"); } /** * 发送消息给RabbitMQ消息队列 发送单播 点对点的消息 direct方式 */ @Test public void rabbitMqSendTest() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //方式一 以默认序列化的方式发送,如用json的方式,写RabbitConfig文件 rabbitTemplate.convertAndSend("myexchange.direct", "myproduct", map); } /** * 发送消息给RabbitMQ消息队列 广播方式 fanout方式 */ @Test public void rabbitMqSendTest2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //fanout广播方式 不用填路由键 无效 rabbitTemplate.convertAndSend("myexchange.fanout", "", map); } /** * 从RabbitMQ接收消息 */ @Test public void rabbitMqGetTest() { //收取消息成功后,会从相对应的消息队列里删除该消息 Object o = rabbitTemplate.receiveAndConvert("myproduct"); System.out.println("接收的消息队列数据类型:" + o.getClass()); System.out.println("接收的消息队列数据:" + o.toString()); } }
package rabbitmq.demo.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { /** * 用json方式替换默认的序列化 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
package rabbitmq.demo.service; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitListenerService { /** * 添加监听器,自动获取消息队列信息 * @param o */ @RabbitListener(queues = {"myproduct","myproduct","myarticle.new"}) //可监听多个queue public void rabbitMqListenerReceive(Object o){ // 需要在启动入口添加 @EnableRabbit 开启基于注解的rabbitMQ模式 System.out.println("接收的消息队列数据类型:" + o.getClass()); System.out.println("接收的消息队列数据:" + o.toString()); } }
#application.yml server: port: 8080 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 139.199.162.126 port: 5672 username: xxxx password: xxxx #虚拟host 可以不设置,使用server默认host #virtual-host: JCcccHost
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
以上是关于Java RabbitMQ配置和使用,基于SpringBoot的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)