RabbitMQ——SpringBoot集成RabbitMQ
Posted 张起灵-小哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ——SpringBoot集成RabbitMQ相关的知识,希望对你有一定的参考价值。
文章目录:
3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列
1.创建一个SpringBoot工程——消息发送者
前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。
在核心配置文件中,配置RabbitMQ的相关连接信息。
#配置RabbitMQ的相关连接信息
spring.rabbitmq.host=192.168.40.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
编写实现消息发送的接口和实现类。
接口中的三个方法分别对应 direct、fanout、topic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。
package com.szh.springboot.rabbitmq.service;
/**
*
*/
public interface SendService {
void sendMessage(String message);
void sendFanout(String message);
void sendTopic(String message);
}
package com.szh.springboot.rabbitmq.service.impl;
import com.szh.springboot.rabbitmq.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
*/
@Service("sendService")
public class SendServiceImpl implements SendService {
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String message) {
/**
* 发送消息
* 参数1:交换机名称
* 参数2:RoutingKey
* 参数3:具体发送的消息内容
*/
amqpTemplate.convertAndSend("springbootDirectExchange","springbootDirectRouting",message);
}
@Override
public void sendFanout(String message) {
amqpTemplate.convertAndSend("fanoutExchange","",message);
}
@Override
public void sendTopic(String message) {
amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
}
}
然后写一个关于三种类型交换机的配置类。
package com.szh.springboot.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*
*/
@Configuration
public class RabbitMQConfig {
//配置一个Direct类型的交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("springbootDirectExchange");
}
//配置一个队列
@Bean
public Queue directQueue() {
return new Queue("springbootDirectQueue");
}
/**
* 配置一个队列和交换机的绑定
* @param directQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind()
* @param directExchange : 需要绑定的交换机对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .to()
* .with() 方法对应的RoutingKey
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("springbootDirectRouting");
}
//配置一个Fanout类型的交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//配置一个Topic类型的交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
}
最后是SpringBoot项目的启动入口类。
这里首先是通过ApplicationContext获取到了Spring容器,然后从容器中拿到sendService这个对象,最后的三行代码分别对应的是测试这三种类型的交换机。
package com.szh.springboot.rabbitmq;
import com.szh.springboot.rabbitmq.service.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext context=SpringApplication.run(Application.class, args);
SendService service= (SendService) context.getBean("sendService");
service.sendMessage("SpringBoot集成RabbitMQ的测试数据");
//service.sendFanout("SpringBoot集成RabbitMQ的测试数据");
//service.sendTopic("SpringBoot集成RabbitMQ的测试数据");
}
}
1.创建一个SpringBoot工程——消息接收者
前两步都是一样的,只不过在依赖项页面中,要勾选RabbitMQ这个选项。
在核心配置文件中,配置RabbitMQ的相关连接信息。
#配置RabbitMQ的相关连接信息
spring.rabbitmq.host=192.168.40.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
编写实现消息接收的接口和实现类。
接口中的这些方法分别对应 direct、fanout、topic三种类型的交换机,我这里测试这三种类型的交换机来发送接收消息。
package com.szh.sprringboot.rabbitmq.service;
/**
*
*/
public interface ReceiveService {
void receiveMessage();
void directReceive(String message);
void fanoutReceive01(String message);
void fanoutReceive02(String message);
void topicReceive01(String message);
void topicReceive02(String message);
void topicReceive03(String message);
}
package com.szh.sprringboot.rabbitmq.service.impl;
import com.szh.sprringboot.rabbitmq.service.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
*/
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* receiveAndConvert()这个方法,每执行一次只能接收一次消息
* 如果有消息进入,则不会自动接收消息(不建议使用)
*/
@Override
public void receiveMessage() {
// String message= (String) amqpTemplate.receiveAndConvert("springbootDirectQueue");
// System.out.println(message);
}
/**
* @RabbitListener : 用于标记当前方法是一个RabbitMQ的消息监听方法,可以持续性的自动接收消息
* @param message
* 该方法不需要手动调用,Spring会自动运行这个监听方法
*
* 注意:如果该监听方法正常结束,那么Spring会自动确认消息
* 如果出现异常,则Spring不会确认消息,该消息一直存在于消息队列中
*/
@Override
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "springbootDirectQueue"),
exchange = @Exchange(name = "springbootDirectExchange"),
key = {"springbootDirectRouting"}
)
})
public void directReceive(String message) {
System.out.println(message);
}
@Override
@RabbitListener(bindings = {
@QueueBinding( //完成队列和交换机的绑定
value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
)
})
public void fanoutReceive01(String message) {
System.out.println(message);
}
@Override
@RabbitListener(bindings = {
@QueueBinding( //完成队列和交换机的绑定
value = @Queue(), //创建一个队列,没有name属性,表示创建一个随即名称的消息队列
exchange = @Exchange(name = "fanoutExchange",type = "fanout") //创建一个交换机
)
})
public void fanoutReceive02(String message) {
System.out.println(message);
}
@Override
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("topic01"),
exchange = @Exchange(name = "topicExchange",type = "topic"),
key = {"aa"}
)
})
public void topicReceive01(String message) {
System.out.println("topic01 接收到的数据:" + message);
}
@Override
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("topic02"),
exchange = @Exchange(name = "topicExchange",type = "topic"),
key = {"aa.*"}
)
})
public void topicReceive02(String message) {
System.out.println("topic02 接收到的数据:" + message);
}
@Override
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("topic03"),
exchange = @Exchange(name = "topicExchange",type = "topic"),
key = {"aa.#"}
)
})
public void topicReceive03(String message) {
System.out.println("topic03 接收到的数据:" + message);
}
}
最后是SpringBoot项目的启动入口类。
package com.szh.sprringboot.rabbitmq;
import com.szh.sprringboot.rabbitmq.service.ReceiveService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext context=SpringApplication.run(Application.class, args);
ReceiveService service= (ReceiveService) context.getBean("receiveService");
//service.receiveMessage();
}
}
3.测试结果
3.1 direct
先启动消息发送者工程,再启动消息接收者工程。
3.2 fanout
先启动消息接收者工程,再启动消息发送者工程。
因为这里fanout交换机中定义了两个消息队列,它是一对多、不需要绑定RoutingKey的,所以这些消息队列都会接收到消息数据。
3.3 topic
先启动消息接收者工程,再启动消息发送者工程。
因为这里topic交换机中定义了三个消息队列,它是一对多、需要绑定RoutingKey的,根据RoutingKey的不同会限制哪些消息队列能够接收到消息、哪些不能。当绑定的RoutingKey为aa时,只有BingKey为 aa、aa.# 这两个消息队列可以接收到(aa顾名思义、而aa.#是因为#表示0个或多个单词,aa.*接收不到是因为*仅能表示1个单词)。
3.4 RabbitMQ管控台中查看SpringBoot工程创建的交换机和消息队列
这里的消息队列只有direct、topic的,至于为什么没有fanout的,是因为fanout类型的交换机在消息发送/接收服务停止之后,对应的交换机还在,但是消息队列会自动清除掉。
以上是关于RabbitMQ——SpringBoot集成RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ——SpringBoot集成RabbitMQ
SpringBoot - SpringBoot集成RabbitMQ
SpringBoot | 第十二章:RabbitMQ的集成和使用
rabbitmq入门springboot集成rabbitmq