SpringBoot - SpringBoot集成RabbitMQ
Posted MinggeQingchun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot - SpringBoot集成RabbitMQ相关的知识,希望对你有一定的参考价值。
一、SpringBoot集成RabbitMQ
创建两个模块,一个命名springboot-send,一个命名springboot-receive
在两个工程的 pom.xml配置文件中引入AMQP依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--添加AMQP的起步依赖,添加成功后就会自动引入RabbitMQ的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
核心配置文件application.properties文件中
#配置RabbitMQ相关连接信息(单机版)
spring.rabbitmq.host=192.168.133.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
#连接超时,单位毫秒,0表示无穷大,不超时
spring.rabbitmq.connection-timeout=0
#配置RabbitMQ相关连接信息(集群版)
#spring.rabbitmq.addresses=192.168.133.129:5672,192.168.133.130:5672
#spring.rabbitmq.username=root
#spring.rabbitmq.password=root
1、direct交换机
消息发送方
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig
//配置Direct类型一个队列
@Bean
public Queue directQueue()
return new Queue("bootDirectQueue");
//配置一个Direct类型的交换
@Bean
public DirectExchange directExchange()
return new DirectExchange("bootDirectExchange");
/**
* 配置Direct类型一个队列和交换机的绑定
* @param directQueue 需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
* @param directExchange 需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange)
/*
参数1 需要绑定的队列
参数2 需要绑定的交换机
参数3 绑定时的RoutingKey
* */
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
@Service("sendService")
public class SendServiceImpl implements SendService
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Autowired
private AmqpTemplate amqpTemplate;
/**
* direct 交换机
* @param message
*/
public void sendDirectMessage(String message)
/**
* 发送消息
* 参数 1 为交换机名
* 参数 2 为RoutingKey
* 参数 3 为我们的具体发送的消息数据
*/
amqpTemplate.convertAndSend("bootDirectExchange","bootDirectRoutingKey",message);
amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
消息接收方
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig
//配置一个Direct类型的交换
@Bean
public DirectExchange directExchange()
return new DirectExchange("bootDirectExchange");
//配置一个队列
@Bean
public Queue directQueue()
return new Queue("bootDirectQueue");
/**
* 配置一个队列和交换机的绑定
* @param directQueue 需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
* @param directExchange 需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange)
/*
参数1 需要绑定的队列
参数2 需要绑定的交换机
参数3 绑定时的RoutingKey
* */
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
2、fanout交换机
消息发送方
@Configuration
public class RabbitMQConfig
//配置一个 Fanout类型的交换
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("fanoutExchange");
@Service("sendService")
public class SendServiceImpl implements SendService
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Autowired
private AmqpTemplate amqpTemplate;
/**
* fanout 交换机
* @param message
*/
public void sendFanoutMessage(String message)
amqpTemplate.convertAndSend("fanoutExchange","",message);
消息接收方
@Configuration
public class RabbitMQConfig
//创建一个名字为 fanoutQueue的队列
@Bean
public Queue fanoutQueue()
return new Queue("fanoutQueue");
//创建一个名字为 BootFanoutExchange的交换机
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("BootFanoutExchange");
@Bean
public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange)
//将队列绑定到指定的交换机上
//参数1 为指定的队列对象
//参数2 为指定的交换机对象
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings=
@QueueBinding(//@QueueBinding注解要完成队列和交换机的绑定
value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
)
)
public void fanoutReceive01(String message)
System.out.println("fanoutReceive01监听器接收的消息----"+message);
@RabbitListener(bindings=
@QueueBinding(//@QueueBinding注解要完成队列和交换机的绑定
value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
)
)
public void fanoutReceive02(String message)
System.out.println("fanoutReceive02监听器接收的消息----"+message);
3、topic交换机
消息发送方
@Configuration
public class RabbitMQConfig
//配置一个 Topic 类型的交换
@Bean
public TopicExchange topicExchange()
return new TopicExchange("topicExchange");
@Service("sendService")
public class SendServiceImpl implements SendService
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Autowired
private AmqpTemplate amqpTemplate;
/**
* topic 交换机
* @param message
*/
public void sendTopicMessage(String message)
amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
消息接收方
@Configuration
public class RabbitMQConfig
@Bean
public Queue topicQueue()
return new Queue("bootTopicQueue");
//创建队列
@Bean
public Queue topicQueue2()
return new Queue("topicQueue2");
@Bean
public TopicExchange topicExchange()
return new TopicExchange("bootTopicExchange");
@Bean
public Binding topicBinding(Queue topicQueue,TopicExchange topicExchange)
/*
参数1 需要绑定的队列
参数2 需要绑定的交换机
参数3 绑定时的RoutingKey
* */
return BindingBuilder.bind(topicQueue).to(topicExchange).with("boot");
@Bean
public Binding topicBinding2(Queue topicQueue2,TopicExchange topicExchange)
//将队列绑定到指定交换机
//参数1 为指定队列对象
//参数2 为指定的交换机对象
//参数3 为RoutingKey的匹配规则,#.test表示 可以接收以任意路径靠头的但是必须以test结尾的队列
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.text");
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = @QueueBinding(value=@Queue("topic01"),key = "aa",exchange =@Exchange(name = "topicExchange",type = "topic")))
public void topicReceive01(String message)
System.out.println("topic01消费者 ---aa---"+message );
@RabbitListener(bindings = @QueueBinding(value=@Queue("topic02"),key = "aa.*",exchange =@Exchange(name = "topicExchange",type = "topic")))
public void topicReceive02(String message)
System.out.println("topic02消费者 ---aa.*---"+message );
@RabbitListener(bindings = @QueueBinding(value=@Queue("topic03"),key = "aa.#",exchange =@Exchange(name = "topicExchange",type = "topic")))
public void topicReceive03(String message)
System.out.println("topic03消费者 ---aa.#---"+message );
运行测试Send消息发送,编写Application.java类
@SpringBootApplication
public class RabbitmqSpringbootSendApplication
public static void main(String[] args)
ApplicationContext applicationContext = SpringApplication.run(RabbitmqSpringbootSendApplication.class, args);
SendService sendService = (SendService) applicationContext.getBean("sendService");
// sendService.sendDirectMessage("Boot的direct测试数据");
// sendService.sendFanoutMessage("Boot的Fanout测试数据");
sendService.sendTopicMessage("Boot的Topic测试数据,key为aa.bb.cc");
运行测试Receive消息接收,编写Application.java类
@SpringBootApplication
public class RabbitmqSpringbootReceiveApplication
public static void main(String[] args)
ApplicationContext applicationContext = SpringApplication.run(RabbitmqSpringbootReceiveApplication.class, args);
ReceiveService service = (ReceiveService) applicationContext.getBean("receiveService");
//使用了消息监听器接收消息那么就不需要调用接收方法来接收消息
// service.receive();
以上是关于SpringBoot - SpringBoot集成RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章