SpringBoot整合RabbitMQ之发送接收消息实战

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ之发送接收消息实战相关的知识,希望对你有一定的参考价值。

实战前言
前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的。特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并大概了解一下RabbitMQ相关组件的源码时,会发现其中的ApplicationEvent、ApplicationListener、ApplicationEventPublisher跟RabbitMQ的Message、Listener、RabbitTemplate有“异曲同工之妙”,当然啦,其中更多有关联关系的是它们的底层源码,感兴趣的童鞋可以研究一番!

实战概要
从本篇文章将开始采用SpringBoot整合RabbitMQ的方式来实战相关知识要点、企业级应用业务模块以及微服务项目一些典型的问题。
本篇文章将介绍实战RabbitMQ在SpringBoot项目中的基本应用,即如何创建队列、交换机、路由及其绑定以及如何发送接收消息!

实战历程
前几篇文章我们已经实现了如何采用IDEA开发工具实现SpringBoot整合RabbitMQ的配置,其中有一个相当重要的配置类 RabbitmqConfig.java ,我们将在这里创建队列、交换机、路由及其绑定,下面我们就创建一个简单的消息模型吧:DirectExchange+RoutingKey 。以下为创建队列、交换机、路由及其绑定的相关信息。

1、首先是application.properties配置文件中配置的信息

mq.env=local
basic.info.mq.exchange.name=${mq.env}:basic:info:mq:exchange
basic.info.mq.routing.key.name=${mq.env}:basic:info:mq:routing:key
basic.info.mq.queue.name=${mq.env}:basic:info:mq:queue

2、RabbitmqConfig创建队列、交换机、路由及其绑定

    //TODO:基本消息模型构建
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(env.getProperty("basic.info.mq.exchange.name"), true,false);
    }

    @Bean(name = "basicQueue")
    public Queue basicQueue(){
        return new Queue(env.getProperty("basic.info.mq.queue.name"), true);
    }

    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("basic.info.mq.routing.key.name"));
    }

3、当我们在上面创建好队列、交换机、路由及其绑定后,我们可以先把整个项目跑起来,然后打开http://localhost:15672/ 访问RabbitMQ后端控制台,点击 Queues、Exchanges 栏目,即可看到我们创建好的队列、交换机。如下所示
技术分享图片
技术分享图片

4、现在可以说是万事具备,只欠东风。创建好了队列,自然是要来使用的,下面我就采用这条队列来发送接收“简单的字符串信息” 以及 “对象实体信息”!在这里,我们在Controller执行发送逻辑,其中充当发送消息的组件是RabbitTemplate,充当消息的组件为Message。如下所示RabbitController.java

@RestController
    public class RabbitController {
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
    private static final String Prefix="rabbit";

    @Autowired
    private Environment env;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 发送简单消息
     * @param message
     * @return
     */
    @RequestMapping(value = Prefix+"/simple/message/send",method = RequestMethod.GET)
    public BaseResponse sendSimpleMessage(@RequestParam String message){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待发送的消息: {} ",message);

            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("发送简单消息发生异常: ",e.fillInStackTrace());
        }
        return response;
    }

    /**
     * 发送对象消息
     * @param user
     * @return
     */
    @RequestMapping(value = Prefix+"/object/message/send",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse sendObjectMessage(@RequestBody User user){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待发送的消息: {} ",user);

            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("发送对象消息发生异常: ",e.fillInStackTrace());
        }
        return response;
    }}

5、在上面可以看到我们的发送代码逻辑其实并不复杂,其思路主要是来源于第一阶段的介绍消息模型中的其中一种,如下图所示。即我们是将消息发送到exchange,然后由于exchange与某个routingKey绑定路由到某个队列queue,故而当消息到达exchange后,将自然而然的被路由到指定的queue中,等待被监听消费。
技术分享图片

6、下面我们需要创建一个listener用于监听消费此队列中的消息。代码逻辑如下CommonListener.java

@Component
    public class CommonListener {

    private static final Logger log= LoggerFactory.getLogger(CommonListener.class);

    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 监听消费消息
     * @param message
     */
    @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
        try {
            //TODO:接收String
            String result=new String(message,"UTF-8");
            log.info("接收String消息: {} ",result);
        }catch (Exception e){
            log.error("监听消费消息 发生异常: ",e.fillInStackTrace());
        }
    }}

7、紧接着,我们将整个项目跑起来,然后首先访问 “http://127.0.0.1:9092/mq/rabbit/simple/message/send?message=简单消息模型2” ,然后即可看到listener接收到改消息,可以在控制台打印输出!
技术分享图片

8、然后我们改造一下 CommonListener的监听消费代码逻辑用于监听消费对象实体的信息,如下

    /**
     * 监听消费消息
     * @param message
     */
    @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
        try {
            //TODO:接收对象
            User user=objectMapper.readValue(message, User.class);
            log.info("接收对象消息: {} ",user);
        }catch (Exception e){
            log.error("监听消费消息 发生异常: ",e.fillInStackTrace());
        }
    }}

然后再将整个项目跑起来,在postman如下发送一个对象实体信息
技术分享图片

可以在listener打断点啥的监听期执行流程,然后即可看到listener监听消费到了队列中该对象实体消息,如下
技术分享图片

至此我们的队列、交换机、路由的创建没啥问题了,而且我们在控制台观察会发现,CommonListener.java中的@RabbitListener 注解的方法确实以 不同于 主线程 的异步线程来执行的!正如上图所看到的 那个就是线程Thread的ID。

9、上面的对象实体User的实体字段信息包含下面三个字段

    public class User implements Serializable{
    private Integer id;
    private String userName;
    private String name;

    public User(Integer id, String userName, String name) {
        this.id = id;
        this.userName = userName;
        this.name = name;
    }

    public User() {
    }

    public Integer getId() {
        return id;
    }

    //TODO:省略了getter/setter方法--也可以在类上面 @Data 加入 Lombok注解 - 当然啦前提是要加入Lombok依赖即可

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", userName=‘" + userName + ‘‘‘ +
                ", name=‘" + name + ‘‘‘ +
                ‘}‘;
    }}

10、至此,我们的SpringBoot整合RabbitMQ实战发送接收消息 已经介绍完毕!在上面我们的RabbitTemplate充当了发送消息的组件,这个组件在SpringBoot搭建的微服务项目中至关重要,我们还可以在其中设置其他的相关属性,包括我们之前设置好的 “消息回调”、“消息确认”等追溯的属性!当然啦,在后面我们会发现 RabbitTemplate 发送的消息的方式有n多种,特别是在消息确认方面,写法也是有多种,在后面我们会慢慢以代码体现出来!但不管怎么写,我们还是要牢牢把握住上面最基本的写法以及上面介绍的消息模型图:因为那就是发送接收消息的核心所在!

实战总结
SpringBoot搭建的微服务项目目前也是越来越流行,而消息中间件RabbitMQ应用也是越来越广泛,本文我们介绍了如何在SpringBoot搭建的项目中利用SpringBoot提供的起步依赖、自动装配等先天优势来创建队列、交换机、路由及其绑定并实现消息的发送监听接收消费,其实这已经初步实现了业务服务模块之间的解耦。在下节我们将用此来实战企业级应用、微服务项目中常见的业务模块:异步写用户操作日志;异步发送邮件!

实战结语
最近博主将SpringBoot整合RabbitMQ这一系列的文章进行了精要抽取并在gitchat上进行了分享,感兴趣的童鞋可以关注并加入我发起的文章交流会!扫一扫下面的二维码即可哦
技术分享图片

另外,相关的文章也会同步发布在×××公众号哦,感兴趣的童鞋也可以关注关注。有相关问题可以加我QQ:1974544863进行交流或者加群进行讨论:java开源技术交流-583522159
技术分享图片

以上是关于SpringBoot整合RabbitMQ之发送接收消息实战的主要内容,如果未能解决你的问题,请参考以下文章

springboot系列-springboot整合RabbitMQ

springboot学习笔记-6 springboot整合RabbitMQ

SpringBoot 整合RabbitMQ

企业级 SpringBoot 教程 (十五)Springboot整合RabbitMQ

SpringBoot实战之RabbitMQ

Spring Boot 整合 RabbitMQ