RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(生产者)

Posted 嘉禾嘉宁papa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(生产者)相关的知识,希望对你有一定的参考价值。

一、简介

  本文主要用使用Spring Boot(2.5.2)来整合RabbitMQ(2.5.2),使用simple容器实现一个生产者。本文的前提是有一个安装好的RabbitMQ的环境,及我的上一篇文章里消费者服务:
  链接: RabbitMQ笔记(一)SpringBoot整合RabbitMQ之simple容器(消费者).

二、Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/>
    </parent>
    <groupId>com.alian</groupId>
    <artifactId>publish</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>publish</name>
    <description>SpringBoot整合RabbitMQ之simple容器(生产者)</description>
    <properties>
        <java.version>1.8</java.version>
        <!--com.fasterxml.jackson 版本-->
        <jackson.version>2.9.10</jackson.version>
        <!--spring 版本-->
        <spring.version>5.3.8</spring.version>
        <!--junit 版本-->
        <junit.version>4.12</junit.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <!--rabbitMq的版本 版本最好和springboot保持一致-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <!--spring-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>

        <!--用于序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <!--java 8时间序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <!--自己打包上传到私服的,用于测试-->
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  这里需要注意的是下面这个包,是我本人打包到私服的,其实一个员工类,支付类,加上一个常量类,在我上一篇文章里也提过,就不多说了。

<dependency>
    <groupId>com.alian</groupId>
    <artifactId>common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

三、消息确认

3.1 消息发布确认ConfirmCallback

  通过实现RabbitTemplate.ConfirmCallback接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是:消息正确到达Exchange(交换机)中就触发。这里你也可以可以利用CorrelationData和redis等实现相关的业务逻辑,比如你创建CorrelationData对象时定义一个唯一的key,并关联业务数据,然后回调时你可以取出数据继续其他的业务。

ConfirmCallBackService.java

package com.alian.publish.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ConfirmCallBackService implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack){//消息发送失败
            System.out.println("消息发送失败,原因为:" + cause);
            //可以利用CorrelationData和redis等实现相关的业务逻辑
            //dosomething();
            return;
        }
        //消息发送成功
        System.out.println("消息发送成功");
    }
}

想要你的ConfirmCallback配置生效,还得在配置文件中增加如下配置

#发送消息的确认模式有三种 none,correlated,SIMPLE
spring.rabbitmq.publisher-confirm-type=correlated
  • NONE:禁用发布确认模式,默认值
  • CORRELATED:发布消息成功到交换机你后会触发回调方法
  • SIMPLE:发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果

3.2 消息到达确认ReturnsCallback

  通过实现RabbitTemplate.ReturnsCallback接口,启动消息失败返回,比如:消息路由不到队列时就会触发回调。这个时候你可能要重新发送或者丢弃消息,又或是配合死信队列继续业务处理。

ReturnCallBackService.java

package com.alian.publish.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ReturnCallBackService implements RabbitTemplate.ReturnsCallback {

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息失败返回状态码:{}", returnedMessage.getReplyCode());
        log.info("消息失败返回消息文本:{}", returnedMessage.getReplyText());
        log.info("消息失败返回交换机:{}", returnedMessage.getExchange());
        log.info("消息失败返回路由:{}", returnedMessage.getRoutingKey());
        log.info("消息失败返回消息内容:{}", returnedMessage.getMessage());
        //dosomething();
    }
    
}

想要你的ReturnsCallback配置生效,还得在配置文件中增加如下配置

#启用消息发布失败回调,默认关闭(比如从交换机路由不到到队列后触发)
spring.rabbitmq.publisher-returns=true

怎么使用呢只需要通过注解@Autowired引入,具体的可以看下一小节。

    @Autowired
    private ConfirmCallBackService confirmCallBackService;

    @Autowired
    private ReturnCallBackService returnCallBackService;

四、核心配置类

SimpleRabbitMqConfig.java

package com.alian.publish.config;

import com.alian.publish.service.ConfirmCallBackService;
import com.alian.publish.service.ReturnCallBackService;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;

@Configuration
public class SimpleRabbitMqConfig {

    @Autowired
    private ConfirmCallBackService confirmCallBackService;

    @Autowired
    private ReturnCallBackService returnCallBackService;

    @Bean(name = "simpleContainerFactory")
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory simpleContainerFactory = new SimpleRabbitListenerContainerFactory();
        //she设置工厂
        simpleContainerFactory.setConnectionFactory(connectionFactory);
        //发送消息使用Jackson2JsonMessageConverter序列化
        simpleContainerFactory.setMessageConverter(this.jackson2JsonMessageConverter());
        //消费者listener抛出异常,是否重回队列,默认true:重回队列, false为不重回队列(结合死信交换机)
        simpleContainerFactory.setDefaultRequeueRejected(false);
        return simpleContainerFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        //she设置工厂
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //发送消息使用Jackson2JsonMessageConverter序列化(支持java 8时间)
        rabbitTemplate.setMessageConverter(this.jackson2JsonMessageConverter());
        //设置receive()方法的超时时间
        rabbitTemplate.setReceiveTimeout(5000L);
        //sendAndReceive()方法的超时时间,默认5000毫秒
        rabbitTemplate.setReplyTimeout(30000L);
        //设置消息发布确认回调(消息发送到交换机就会触发)
        rabbitTemplate.setConfirmCallback(confirmCallBackService);
        //设置消息发布失败回调(比如从交换机路由不到到队列后触发)
        rabbitTemplate.setReturnsCallback(returnCallBackService);
        //Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean("jacksonMessageConverter")
    public MessageConverter jackson2JsonMessageConverter() {
        ObjectMapper mapper = getMapper();
        return new Jackson2JsonMessageConverter(mapper);
    }

    /**
     * 使用com.fasterxml.jackson.databind.ObjectMapper
     * 对数据进行处理包括java8里的时间
     *
     * @return
     */
    private ObjectMapper getMapper() {
        ObjectMapper mapper = new ObjectMapper();
        //设置可见性
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        //默认键入对象
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        //设置Java 8 时间序列化
        JavaTimeModule timeModule = new JavaTimeModule();
        timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
        timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
        timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter以上是关于RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(生产者)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ笔记SpringBoot整合RabbitMQ之simple容器(消费者)

springboot 整合 rabbitmq 转载https://www.cnblogs.com/hlhdidi/p/6535677.html

SpringBoot系列5SpringBoot整合RabbitMQ

RabbitMQ整合SpringBoot

RabbitMQ整合SpringBoot

RabbitMQ——springboot整合RabbitMQ