Spring boot 消息服务

Posted 流星蝴蝶没有剑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring boot 消息服务相关的知识,希望对你有一定的参考价值。

消息服务

RabbitMQ

安装:【安装路径最好默认】

  1. 首先需要下载Erlang语言包
  2. 下载Rabbitmq服务

安装好之后,可能会访问【http://127.0.0.1:15672】失败

请参考:https://www.jianshu.com/p/e041f5ed10c7

添加配置【application.propertise】

# 配置 消息服务 Rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

发布、订阅模式

基于API整合

手动申明交换器、队列、路由键等
组装消息队列供应用程序调用
实现消息服务

  1. 首先运行接受消息。
    分别使用三种整合方式对三种模式进行消息监听
    RabbitMQService.java

package wx0725.top.service;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import wx0725.top.domain.User;

/**
 * RabbitMQ消息接收处理的业务类
 * 注解
 */
@Service
public class RabbitMQService 

    /**
     * **使用基于注解
     * 1.1、Publish/Subscribe工作模式接收,处理邮件业务
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("文轩FaDing_queue_email"), exchange = @Exchange(value = "文轩FaDing_exchange", type = "文轩FaDing")))
    public void psubConsumerEmailAno(User user) 
        System.out.println("邮件业务接收到消息: " + user);
    

    /**
     * 1.2、Publish/Subscribe工作模式接收,处理短信业务
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("文轩FaDing_queue_sms"), exchange = @Exchange(value = "文轩FaDing_exchange", type = "文轩FaDing")))
    public void psubConsumerSmsAno(User user) 
        System.out.println("短信业务接收到消息: " + user);
    


    /**
     * 2.1、路由模式消息接收,处理error级别日志信息
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("文轩LuYou_queue_error"), exchange = @Exchange(value = "文轩LuYou_exchange", type = "direct"), key = "error_文轩LuYou_key"))
    public void routingConsumerError(String message) 
        System.out.println("接收到error级别日志消息: " + message);
    

    /**
     * 2.2、路由模式消息接收,处理info、error、warning级别日志信息
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("文轩LuYou_queue_all"), exchange = @Exchange(value = "文轩LuYou_exchange", type = "direct"), key = "error_文轩LuYou_key", "info_文轩LuYou_key", "warning_文轩LuYou_key"))
    public void routingConsumerAll(String message) 
        System.out.println("接收到info、error、warning等级别日志消息: " + message);
    


    /**
     * 3.1、通配符,邮件订阅
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("文轩TongPeiFu_queue_email"), exchange = @Exchange(value = "文轩TongPeiFu_exchange", type = "文轩TongPeiFu"), key = "info.#.email.#"))
    public void topicConsumerEmail(String message) 
        System.out.println("接收到邮件订阅需求处理消息: " + message);
    

    /**
     * 3.2、通配符,短信订阅
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("文轩TongPeiFu_queue_sms"), exchange = @Exchange(value = "文轩TongPeiFu_exchange", type = "文轩TongPeiFu"), key = "info.#.sms.#"))
    public void topicConsumerSms(String message) 
        System.out.println("接收到短信订阅需求处理消息: " + message);
    



  1. 测试代码:分别三种模式进行发送测试(发布订阅模式、路由模式、通配符模式)
    ApplicationTest .java
package wx0725.top;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import wx0725.top.domain.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest 


    /**
     * API整合
     */


//    发布订阅模式 发送消息
    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void amqpAdmin() 
        // 1、定义fanout类型的交换器
        amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
        // 2、定义两个默认持久化队列,分别处理email和sms
        amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
        amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
        // 3、将队列分别与交换器进行绑定
        amqpAdmin.declareBinding(new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
        amqpAdmin.declareBinding(new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
    

    @Test
    public void psubPublisher() 
        User user = new User();
        user.setId(725);
        user.setUsername("文轩");
        rabbitTemplate.convertAndSend("fanout_exchange", "", user);
    



    //    路由模式消息发送
    @Test
    public void routingPublisher() 
        rabbitTemplate.convertAndSend("routing_exchange", "error_routing_key", "路由发送消息错误");
    

    //通配符工作模式消息发送端
    @Test
    public void topicPublisher() 
        // 1、只发送邮件订阅用户消息
        rabbitTemplate.convertAndSend("topics_exchange", "info.email", "通配符整合发送邮箱");
        // 2、只发送短信订阅用户消息
        rabbitTemplate.convertAndSend("topics_exchange", "info.sms", "通配符整合发送sms消息");
        // 3、发送同时订阅邮件和短信的用户消息
        rabbitTemplate.convertAndSend("topics_exchange", "info.email.sms", "通配符整合发送邮箱和sms消息");
    



package wx0725.top.service;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import wx0725.top.domain.User;

/**
 * RabbitMQ消息接收处理的业务类
 * 注解
 */
@Service
public class RabbitMQService 

    /**
     * **使用基于注解
     * 1.1、Publish/Subscribe工作模式接收,处理邮件业务
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_email"), exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
    public void psubConsumerEmailAno(User user) 
        System.out.println("邮件业务接收到消息: " + user);
    

    /**
     * 1.2、Publish/Subscribe工作模式接收,处理短信业务
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_sms"), exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
    public void psubConsumerSmsAno(User user) 
        System.out.println("短信业务接收到消息: " + user);
    


    /**
     * 2.1、路由模式消息接收,处理error级别日志信息
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = "error_routing_key"))
    public void routingConsumerError(String message) 
        System.out.println("接收到error级别日志消息: " + message);
    

    /**
     * 2.2、路由模式消息接收,处理info、error、warning级别日志信息
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = "error_routing_key", "info_routing_key", "warning_routing_key"))
    public void routingConsumerAll(String message) 
        System.out.println("接收到info、error、warning等级别日志消息: " + message);
    


    /**
     * 3.1、通配符,邮件订阅
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.email.#"))
    public void topicConsumerEmail(String message) 
        System.out.println("接收到邮件订阅需求处理消息: " + message);
    

    /**
     * 3.2、通配符,短信订阅
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.sms.#"))
    public void topicConsumerSms(String message) 
        System.out.println("接收到短信订阅需求处理消息: " + message);
    






以上是关于Spring boot 消息服务的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot+STOMP解决消息乱序问题

Spring Boot (十三): Spring Boot 整合 RabbitMQ

译Spring官方教程:Spring Boot整合消息中间件RabbitMQ

Spring Boot消息服务

Spring Boot教程32——WebSocket

Spring Boot与消息