RabbitMQ RabbitMQ高级整合应用
Posted niugang0920
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ RabbitMQ高级整合应用相关的知识,希望对你有一定的参考价值。
RabbitMQ整合Spring AMQP实战
常用组件介绍
-
RabbitAdmin
-
Spring AMQP声明 通过@Bean注解进行声明
-
RabbitTemplate
-
SimpleMessageListenerContainer 对消息消费进行详细配置和优化
-
MessageListenerAdapter 消息监听适配器,建立在监听器基础之上
-
MessageConverter
RabbitAdmin
-
RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
-
注意:autoSatrtup必须设置为true,否则spring容器不会加载RabbitAdmin类
-
RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明;
-
底层使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作;
RabbitMQ简单使用
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
@Configuration
public class RabbitMqConfig1 {
/**
* 设置连接
* @return ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
/**
* 创建RabbitAdmin
* @return RabbitAdmin
*/
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
//默认就是true
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
测试
@Autowired
private RabbitAdmin rabbitAdmin;
/**
* RabbitAdmin api应用
*/
@Test
public void testAdmin() {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
//绑定
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
//使用 BindingBuilder 创建绑定
// https://docs.spring.io/spring-amqp/docs/2.1.16.BUILD-SNAPSHOT/reference/html/#builder-api
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接创建队列
.to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
.with("user.#")); //指定路由Key
//FanoutExchange 类型exchange不走路由键
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
// rabbitAdmin.purgeQueue("test.topic.queue", false);
}
SpringAMQP声明(Exchange、Queue、Binding)
在RabbitMQ基础AP里面声明一个Exchange、声明一个绑定、一个队列
//基础API声明一个exchange
channel.exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
//基础API 声明一个队列
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)
//基础API 声明binding
channel.queueBind(String queue, String exchange, String routingKey)
使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式
//声明Topic 类型的exchange
@Bean
public TopicExchange topicExchange() {
//exchange 持久化
// Exchange springEchange = ExchangeBuilder.topicExchange("spring_amqp_test_echange").durable(true).build();
return new TopicExchange("spring_amqp_test_echange", true, false);
}
//声明队列
@Bean
public Queue queue() {
// Queue spring_amqp_test_echange = QueueBuilder.durable("spring_amqp_test_echange").build();
return new Queue("spring_amqp_test_queue");
}
//建立绑定
@Bean
public Binding binding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("spring.*");
}
消息模板 RabbitTemplate
-
RabbitTemplate,即消息模板。
-
在与SpringAMQP整合的时候进行发送消息的关键类
-
该类提供了丰富的发送消息的方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进入注入到Spring容器中,然后直接使用;
-
在与Spring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可;
RabbitTemplate简单使用
配置
@Configuration
public class RabbitMqConfig3 {
/**
* 设置连接
*
* @return ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
/**
* 创建RabbitAdmin
*
* @return RabbitAdmin
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//默认就是true
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* 消息模板
*
* @param connectionFactory connectionFactory
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange :通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久
}
@Bean
public Binding binding001(TopicExchange exchange001, Queue queue001) {
return BindingBuilder.bind(queue001).to(exchange001).with("spring.*");
}
}
测试
@Test
public void testSendMessage() {
//1 创建消息
//AMQP消息的消息属性
//MessageBuilder(也可以构建Message) 使用流利的API从byte[]主体或其他消息构建Spring AMQP消息。
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
队列queue001
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
队列queue001
队列queue002
简单消息监听容器:SimpleMessageListenerContainer
- 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
- 监听队列(多个队列)、自动启动、自动声明功能
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
- 设置消费者数量、最小最大数量、批量消费
- 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等等
注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大;
思考一下:SimpleMessageListenerContainer为什么可以动态感知配置变更?
配置
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// container.setQueueNames(); 接收字符串的队列名
//
container.setQueues(queue001(), queue002(), queue003());
//当前消费者数量
container.setConcurrentConsumers(1);
//最大消费者数量
container.setMaxConcurrentConsumers(5);
//是否使用重队列
container.setDefaultRequeueRejected(false);
//自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//设置消息监听
//必须设置消息监听 否则 报 No message listener specified - see property ‘messageListener‘
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
//做消息处理....
}
});
return container;
}
消息监听适配器:MessageListenerAdapter
通过MessageListenerAdapter的代码我们可以看出如下核心属性:
- defaultListenerMethod默认监听方法名称:用于设置监听方法名称
- Delegate委托对象:实际真实的委托对象,用于处理消息、
- queueOrTagToMethodName: 队列标识与方法名称组成的集合
- 可以一一进行队列与方法名称的匹配;
- 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接收处理;
配置
public class MessageDelegate1 {
public void handleMessage(byte[] messageBody) {
System.err.println("默认方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息内容:" + messageBody);
}
public void method1(String messageBody) {
System.err.println("method1 收到消息内容:" + new String(messageBody));
}
public void method2(String messageBody) {
System.err.println("method2 收到消息内容:" + new String(messageBody));
}
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息内容:" + messageBody);
}
}
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// container.setQueueNames(); 接收字符串的队列名
//
container.setQueues(queue001(), queue002(), queue003());
//当前消费者数量
container.setConcurrentConsumers(1);
//最大消费者数量
container.setMaxConcurrentConsumers(5);
//是否使用重队列
container.setDefaultRequeueRejected(false);
//自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1 适配器方式. 默认是有自己的方法名字的:handleMessage
// 可以自己指定一个方法的名字: consumeMessage
// 也可以添加一个转换器: 从字节数组转换为String
//MessageDelegate1如何写 MessageListenerAdapter 源码里面也给出了一些建议
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate1());
//默认的方法是 public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
adapter.setDefaultListenerMethod("consumeMessage");
//TextMessageConverter 自定义的消息转换器
//new TextMessageConverter()-->consumeMessage(byte[] messageBody))->MessageProperties.setContentType("text/plian")
//new Jackson2JsonMessageConverter()--->consumeMessage(Map messageBody))->MessageProperties.setContentType("application/json")
// adapter.setMessageConverter(new Jackson2JsonMessageConverter());
container.setMessageListener(adapter);
return container;
}
MessageConverter消息转换器
-
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter;
-
自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
-
重写下面两个方法:
- toMessage:java对象转换为Message
- fromMessage:Message对象转换为java对象
-
MessageConverter消息转换器:
-
Json转换器:Jackson2JsonMessageConverter:可以进行java对象的转换功能;
-
DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系;
-
自定义二进制转换器:比如图片类型、PDF、PPT、流媒体等
使用转换器的目的是当传入不同的类型的数据(如json,类,PDF,图片等)时,在消息的接收方接收到时也总是以传入的类型接收结果对象;我们通过写入不同的转换器以达到此种效果。具体可百度。
JSON格式转换
默认监听方法的参数为Map
public class Order {
private String id;
private String name;
private String content;
public Order() {
}
public Order(String id, String name, String content) {
this.id = id;
this.name = name;
this.content = content;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
配置
// 1.1 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// public void consumeMessage(Map messageBody) {
// System.err.println("map方法, 消息内容:" + messageBody);
// }
//对应map参数方法
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
测试
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
JSON格式转换支持Java对象
默认监听方法的参数为Java对象
委托对象方法
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
配置
// 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//信任所有的包,否则会报 报不信任
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
测试
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("订单消息");
order.setContent("订单描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
//__TypeId__ 这个是固定写法
messageProperties.getHeaders().put("__TypeId__", "com.niugang.spring.entity.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
输出
order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
JSON格式转换支持Java对象(二)
委托对象方法
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
配置
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.niugang.spring.entity.Order.class);
idClassMapping.put("packaged", com.niugang.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
测试
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order();
order.setId("001");
order.setName("订单消息");
order.setContent("订单描述信息");
String json1 = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json1);
MessageProperties messageProperties1 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged();
pack.setId("002");
pack.setName("包裹消息");
pack.setDescription("包裹描述信息");
String json2 = mapper.writeValueAsString(pack);
System.err.println("pack 4 json: " + json2);
MessageProperties messageProperties2 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
全局消息转化器与自定义转化器
自定义文本转化器
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
自定义图片转化器
/**
* 图片转化器
*/
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------Image MessageConverter----------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
//目录必须存在
String path = "d:/springbootlog/" + fileName + "." + extName;
File f = new File(path);
try {
//拷贝到指定路径
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
自定义pdf转化器
public class PDFMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------PDF MessageConverter----------");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/springbootlog/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
委托对象
public void consumeMessage(File file) {
System.err.println("文件对象 方法, 消息内容:" + file.getName());
}
配置
//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的转换器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
测试
@Test
public void testSendExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("C:\Users\Administrator\Desktop\公众号", "spring.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
byte[] body1 = Files.readAllBytes(Paths.get("D:\Documents\技术书籍", "Java huashan-2019-06-20.pdf"));
MessageProperties messageProperties1 = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message1 = new Message(body1, messageProperties);
rabbitTemplate.send("", "pdf_queue", message1);
}
SpringBoot整合配置详解(生产端)
- publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback
- publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功: RabbitTemplate.ReturnCallback
注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效;生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等。
生产端代码示例
application.properties
spring.rabbitmq.addresses=localhost:5672
#spring.rabbitmq.host=localhost
#spring.rabbitmq.port=5762
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# 消息确认模式
spring.rabbitmq.publisher-confirms=true
# 消息返回模式
spring.rabbitmq.publisher-returns=true
# 为true 消息返回模式才生效
spring.rabbitmq.template.mandatory=true
配置
/**
* springboot 消息生产者
*
* @author niugang
*/
@Configuration
public class RabbitMqConfig {
/**
* 自动注入RabbitTemplate模板类
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 回调函数: confirm确认
*/
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if (!ack) {
System.err.println("异常处理....");
}
}
};
/**
* 回调函数: return返回
*/
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
/*
队列监听在消费者端配置,没有将会自动创建
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("exchange-1");
}
@Bean
public Queue queue() {
return new Queue("queue-1");
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("springboot.#");
}*/
/**
* 发送消息方法调用: 构建Message消息
*
* @param message 消息体
* @param properties 消息属性
*/
public void send(Object message, Map<String, Object> properties) {
MessageProperties messageProperties = new MessageProperties();
if (properties != null && properties.size() > 0) {
Set<Map.Entry<String, Object>> entries = properties.entrySet();
for (Map.Entry<String, Object> entry : entries) {
String key = entry.getKey();
Object value = entry.getValue();
messageProperties.setHeader(key, value);
}
}
//org.springframework.amqp.core
Message msg = MessageBuilder.withBody(message.toString().getBytes()).andProperties(messageProperties).build();
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//routingKey修改 为 spring.abc 消息将走 returnCallback
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
}
测试
在rabbitmq控制台新建,Exchange名为exchange-1,新建队列queue-1,并建立两者之间的绑定,routingKey为springboot.#
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Test
public void contextLoads() {
}
@Autowired
private RabbitMqConfig rabbitMqConfig ;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testSender1() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("number", "12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
rabbitMqConfig.send("Hello RabbitMQ For Spring Boot!"+System.currentTimeMillis(), properties);
}
}
注意:进行单元测试,ack一直是false;改为url请求,ack就正常了
SpringBoot整合配置详解(消费端)
消费端核心配置
# NONE, MANUAL, AUTO; 手工消息消息确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#监听器调用程序线程的最小数量。
spring.rabbitmq.listener.simple.concurrency=5
#监听器调用程序线程的最大数量。
spring.rabbitmq.listener.simple.max-concurrency=10
# spring.rabbitmq.listener.type=simple 默认为 SimpleContainer 模式对应 spring.rabbitmq.listener.simple 前缀相关的
注意点
- 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理
- 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况。
@RabbitListener注解的使用
- 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用。
- @RabbitListener是一个组合注解,里面可以注解配置
- @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。
消费者端代码示例
类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
properties
#spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# NONE, MANUAL, AUTO; 手工消息消息确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#监听器调用程序线程的最小数量。
spring.rabbitmq.listener.simple.concurrency=5
#监听器调用程序线程的最大数量。
spring.rabbitmq.listener.simple.max-concurrency=10
# spring.rabbitmq.listener.type=simple 默认为 SimpleContainer 模式对应 spring.rabbitmq.listener.simple 前缀相关的
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
配置
public class Order implements Serializable {
private String id;
private String name;
public Order() {
}
public Order(String id, String name) {
super();
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
/**
* 消费者类
*
* @author niugang
*/
@Configuration
public class RabbitMQReceiver {
/**
* 从1.5.0版开始,您可以在类级别指定@RabbitListener注释。
* 与新的@RabbitHandler批注一起,这使单个侦听器可以根据传入消息的有效负载类型调用不同的方法。
*
* @RabbitListener(id="multi", queues = "someQueue")
* @SendTo("my.reply.queue") public class MultiListenerBean {
* @RabbitHandler public String thing2(Thing2 thing2) {
* ...
* }
* @RabbitHandler public String cat(Cat cat) {
* ...
* }
* @RabbitHandler public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) {
* ...
* }
* @RabbitHandler(isDefault = true)
* public String defaultMethod(Object object) {
* ...
* }
* }
* 在这种情况下,如果转换后的有效负载是Thing2,Cat或Hat,则会调用各个@RabbitHandler方法。
* 您应该了解,系统必须能够根据有效负载类型识别唯一方法。
* 检查该类型是否可分配给没有注释或带有@Payload注释的单个参数。
* 请注意,如方法级别@RabbitListener(前面所述)中所述,应用了相同的方法签名。
*/
//队列 exchange 绑定 没有 自动创建
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable = "true"),
exchange = @Exchange(value = "exchange-1",
durable = "true",
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true"),
key = "springboot.*" //routing key
)
)
@RabbitHandler
//@RabbitListener 提供了很多灵活的签名 如Message Channel @Payload @Header 等 具体可查看源码
// org.springframework.amqp.core.Message
// org.springframework.messaging.Message
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + new String(message.getBody()));
System.err.println("消费端MessageProperties.: " + message.getMessageProperties());
//AmqpHeaders header属性封装
//手工ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* spring.rabbitmq.listener.order.queue.name=queue-2
* spring.rabbitmq.listener.order.queue.durable=true
* spring.rabbitmq.listener.order.exchange.name=exchange-1
* spring.rabbitmq.listener.order.exchange.durable=true
* spring.rabbitmq.listener.order.exchange.type=topic
* spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
* spring.rabbitmq.listener.order.key=springboot.*
*
* @param order order
* @param channel channel
* @param headers headers
* @throws Exception Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable = "${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
//@Headers 必须通过Map接收
//@Header("amqp_receivedRoutingKey") String rk 直接获取header中某一个key
//默认前缀为amqp_
/**
* {amqp_receivedDeliveryMode=PERSISTENT,
* amqp_receivedExchange=exchange-2,
* amqp_deliveryTag=1,
* amqp_consumerQueue=queue-2,
* amqp_redelivered=false,
amqp_receivedRoutingKey=springboot.def,
spring_listener_return_correlation=175a21c4-ffd5-4a3e-ac3a-2f63d60c18a5,
spring_returned_message_correlation=0987654321,
id=53443ced-0b23-3079-71c2-09997897a553,
amqp_consumerTag=amq.ctag-V0hqyVObrHXJeC60MwPSVQ,
contentType=application/x-java-serialized-object,
timestamp=1591240122842}
*/
public void onOrderMessage(@Payload Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.getId());
System.err.println("消费端headers: " + headers);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
以上是关于RabbitMQ RabbitMQ高级整合应用的主要内容,如果未能解决你的问题,请参考以下文章
Springboot 整合 RabbitMQ高级特性 & 真实业务应用