RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(生产者)
Posted 嘉禾嘉宁papa
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(生产者)相关的知识,希望对你有一定的参考价值。
目录
一、简介
本文主要用使用Spring Boot(2.5.2)来整合RabbitMQ(2.5.2),使用simple容器实现一个生产者。本文的前提是有一个安装好的RabbitMQ的环境,及我的上一篇文章里消费者服务:
链接: RabbitMQ笔记(一)SpringBoot整合RabbitMQ之simple容器(消费者).
二、Maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/>
</parent>
<groupId>com.alian</groupId>
<artifactId>publish</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>publish</name>
<description>SpringBoot整合RabbitMQ之simple容器(生产者)</description>
<properties>
<java.version>1.8</java.version>
<!--com.fasterxml.jackson 版本-->
<jackson.version>2.9.10</jackson.version>
<!--spring 版本-->
<spring.version>5.3.8</spring.version>
<!--junit 版本-->
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>${parent.version}</version>
</dependency>
<!--rabbitMq的版本 版本最好和springboot保持一致-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${parent.version}</version>
</dependency>
<!--spring-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--单元测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!--用于序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!--java 8时间序列化-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<!--自己打包上传到私服的,用于测试-->
<dependency>
<groupId>com.alian</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
这里需要注意的是下面这个包,是我本人打包到私服的,其实一个员工类,支付类,加上一个常量类,在我上一篇文章里也提过,就不多说了。
<dependency>
<groupId>com.alian</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
三、消息确认
3.1 消息发布确认ConfirmCallback
通过实现RabbitTemplate.ConfirmCallback接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是:消息正确到达Exchange(交换机)中就触发。这里你也可以可以利用CorrelationData和redis等实现相关的业务逻辑,比如你创建CorrelationData对象时定义一个唯一的key,并关联业务数据,然后回调时你可以取出数据继续其他的业务。
ConfirmCallBackService.java
package com.alian.publish.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ConfirmCallBackService implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack){//消息发送失败
System.out.println("消息发送失败,原因为:" + cause);
//可以利用CorrelationData和redis等实现相关的业务逻辑
//dosomething();
return;
}
//消息发送成功
System.out.println("消息发送成功");
}
}
想要你的ConfirmCallback配置生效,还得在配置文件中增加如下配置
#发送消息的确认模式有三种 none,correlated,SIMPLE
spring.rabbitmq.publisher-confirm-type=correlated
- NONE:禁用发布确认模式,默认值
- CORRELATED:发布消息成功到交换机你后会触发回调方法
- SIMPLE:发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果
3.2 消息到达确认ReturnsCallback
通过实现RabbitTemplate.ReturnsCallback接口,启动消息失败返回,比如:消息路由不到队列时就会触发回调。这个时候你可能要重新发送或者丢弃消息,又或是配合死信队列继续业务处理。
ReturnCallBackService.java
package com.alian.publish.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ReturnCallBackService implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息失败返回状态码:{}", returnedMessage.getReplyCode());
log.info("消息失败返回消息文本:{}", returnedMessage.getReplyText());
log.info("消息失败返回交换机:{}", returnedMessage.getExchange());
log.info("消息失败返回路由:{}", returnedMessage.getRoutingKey());
log.info("消息失败返回消息内容:{}", returnedMessage.getMessage());
//dosomething();
}
}
想要你的ReturnsCallback配置生效,还得在配置文件中增加如下配置
#启用消息发布失败回调,默认关闭(比如从交换机路由不到到队列后触发)
spring.rabbitmq.publisher-returns=true
怎么使用呢只需要通过注解@Autowired引入,具体的可以看下一小节。
@Autowired
private ConfirmCallBackService confirmCallBackService;
@Autowired
private ReturnCallBackService returnCallBackService;
四、核心配置类
SimpleRabbitMqConfig.java
package com.alian.publish.config;
import com.alian.publish.service.ConfirmCallBackService;
import com.alian.publish.service.ReturnCallBackService;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
@Configuration
public class SimpleRabbitMqConfig {
@Autowired
private ConfirmCallBackService confirmCallBackService;
@Autowired
private ReturnCallBackService returnCallBackService;
@Bean(name = "simpleContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory simpleContainerFactory = new SimpleRabbitListenerContainerFactory();
//she设置工厂
simpleContainerFactory.setConnectionFactory(connectionFactory);
//发送消息使用Jackson2JsonMessageConverter序列化
simpleContainerFactory.setMessageConverter(this.jackson2JsonMessageConverter());
//消费者listener抛出异常,是否重回队列,默认true:重回队列, false为不重回队列(结合死信交换机)
simpleContainerFactory.setDefaultRequeueRejected(false);
return simpleContainerFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
//she设置工厂
rabbitTemplate.setConnectionFactory(connectionFactory);
//发送消息使用Jackson2JsonMessageConverter序列化(支持java 8时间)
rabbitTemplate.setMessageConverter(this.jackson2JsonMessageConverter());
//设置receive()方法的超时时间
rabbitTemplate.setReceiveTimeout(5000L);
//sendAndReceive()方法的超时时间,默认5000毫秒
rabbitTemplate.setReplyTimeout(30000L);
//设置消息发布确认回调(消息发送到交换机就会触发)
rabbitTemplate.setConfirmCallback(confirmCallBackService);
//设置消息发布失败回调(比如从交换机路由不到到队列后触发)
rabbitTemplate.setReturnsCallback(returnCallBackService);
//Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean("jacksonMessageConverter")
public MessageConverter jackson2JsonMessageConverter() {
ObjectMapper mapper = getMapper();
return new Jackson2JsonMessageConverter(mapper);
}
/**
* 使用com.fasterxml.jackson.databind.ObjectMapper
* 对数据进行处理包括java8里的时间
*
* @return
*/
private ObjectMapper getMapper() {
ObjectMapper mapper = new ObjectMapper();
//设置可见性
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//默认键入对象
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
//设置Java 8 时间序列化
JavaTimeModule timeModule = new JavaTimeModule();
timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter以上是关于RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(生产者)的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)
springboot 整合 rabbitmq 转载https://www.cnblogs.com/hlhdidi/p/6535677.html