Spring boot 消息服务
Posted 流星蝴蝶没有剑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring boot 消息服务相关的知识,希望对你有一定的参考价值。
消息服务
RabbitMQ
安装:【安装路径最好默认】
安装好之后,可能会访问【http://127.0.0.1:15672】失败
请参考:https://www.jianshu.com/p/e041f5ed10c7
添加配置【application.propertise】
# 配置 消息服务 Rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
发布、订阅模式
基于API整合
手动申明交换器、队列、路由键等
组装消息队列供应用程序调用
实现消息服务
- 首先运行接受消息。
分别使用三种整合方式对三种模式进行消息监听
RabbitMQService.java
package wx0725.top.service;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import wx0725.top.domain.User;
/**
* RabbitMQ消息接收处理的业务类
* 注解
*/
@Service
public class RabbitMQService {
/**
* **使用基于注解
* 1.1、Publish/Subscribe工作模式接收,处理邮件业务
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("文轩FaDing_queue_email"), exchange = @Exchange(value = "文轩FaDing_exchange", type = "文轩FaDing")))
public void psubConsumerEmailAno(User user) {
System.out.println("邮件业务接收到消息: " + user);
}
/**
* 1.2、Publish/Subscribe工作模式接收,处理短信业务
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("文轩FaDing_queue_sms"), exchange = @Exchange(value = "文轩FaDing_exchange", type = "文轩FaDing")))
public void psubConsumerSmsAno(User user) {
System.out.println("短信业务接收到消息: " + user);
}
/**
* 2.1、路由模式消息接收,处理error级别日志信息
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("文轩LuYou_queue_error"), exchange = @Exchange(value = "文轩LuYou_exchange", type = "direct"), key = "error_文轩LuYou_key"))
public void routingConsumerError(String message) {
System.out.println("接收到error级别日志消息: " + message);
}
/**
* 2.2、路由模式消息接收,处理info、error、warning级别日志信息
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("文轩LuYou_queue_all"), exchange = @Exchange(value = "文轩LuYou_exchange", type = "direct"), key = {"error_文轩LuYou_key", "info_文轩LuYou_key", "warning_文轩LuYou_key"}))
public void routingConsumerAll(String message) {
System.out.println("接收到info、error、warning等级别日志消息: " + message);
}
/**
* 3.1、通配符,邮件订阅
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("文轩TongPeiFu_queue_email"), exchange = @Exchange(value = "文轩TongPeiFu_exchange", type = "文轩TongPeiFu"), key = "info.#.email.#"))
public void topicConsumerEmail(String message) {
System.out.println("接收到邮件订阅需求处理消息: " + message);
}
/**
* 3.2、通配符,短信订阅
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("文轩TongPeiFu_queue_sms"), exchange = @Exchange(value = "文轩TongPeiFu_exchange", type = "文轩TongPeiFu"), key = "info.#.sms.#"))
public void topicConsumerSms(String message) {
System.out.println("接收到短信订阅需求处理消息: " + message);
}
}
- 测试代码:分别三种模式进行发送测试(发布订阅模式、路由模式、通配符模式)
ApplicationTest .java
package wx0725.top;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import wx0725.top.domain.User;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
/**
* API整合
*/
// 发布订阅模式 发送消息
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void amqpAdmin() {
// 1、定义fanout类型的交换器
amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
// 2、定义两个默认持久化队列,分别处理email和sms
amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
// 3、将队列分别与交换器进行绑定
amqpAdmin.declareBinding(new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
amqpAdmin.declareBinding(new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
}
@Test
public void psubPublisher() {
User user = new User();
user.setId(725);
user.setUsername("文轩");
rabbitTemplate.convertAndSend("fanout_exchange", "", user);
}
// 路由模式消息发送
@Test
public void routingPublisher() {
rabbitTemplate.convertAndSend("routing_exchange", "error_routing_key", "路由发送消息错误");
}
//通配符工作模式消息发送端
@Test
public void topicPublisher() {
// 1、只发送邮件订阅用户消息
rabbitTemplate.convertAndSend("topics_exchange", "info.email", "通配符整合发送邮箱");
// 2、只发送短信订阅用户消息
rabbitTemplate.convertAndSend("topics_exchange", "info.sms", "通配符整合发送sms消息");
// 3、发送同时订阅邮件和短信的用户消息
rabbitTemplate.convertAndSend("topics_exchange", "info.email.sms", "通配符整合发送邮箱和sms消息");
}
}
package wx0725.top.service;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import wx0725.top.domain.User;
/**
* RabbitMQ消息接收处理的业务类
* 注解
*/
@Service
public class RabbitMQService {
/**
* **使用基于注解
* 1.1、Publish/Subscribe工作模式接收,处理邮件业务
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_email"), exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
public void psubConsumerEmailAno(User user) {
System.out.println("邮件业务接收到消息: " + user);
}
/**
* 1.2、Publish/Subscribe工作模式接收,处理短信业务
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("fanout_queue_sms"), exchange = @Exchange(value = "fanout_exchange", type = "fanout")))
public void psubConsumerSmsAno(User user) {
System.out.println("短信业务接收到消息: " + user);
}
/**
* 2.1、路由模式消息接收,处理error级别日志信息
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = "error_routing_key"))
public void routingConsumerError(String message) {
System.out.println("接收到error级别日志消息: " + message);
}
/**
* 2.2、路由模式消息接收,处理info、error、warning级别日志信息
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_all"), exchange = @Exchange(value = "routing_exchange", type = "direct"), key = {"error_routing_key", "info_routing_key", "warning_routing_key"}))
public void routingConsumerAll(String message) {
System.out.println("接收到info、error、warning等级别日志消息: " + message);
}
/**
* 3.1、通配符,邮件订阅
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_email"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.email.#"))
public void topicConsumerEmail(String message) {
System.out.println("接收到邮件订阅需求处理消息: " + message);
}
/**
* 3.2、通配符,短信订阅
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic_queue_sms"), exchange = @Exchange(value = "topic_exchange", type = "topic"), key = "info.#.sms.#"))
public void topicConsumerSms(String message) {
System.out.println("接收到短信订阅需求处理消息: " + message);
}
}
以上是关于Spring boot 消息服务的主要内容,如果未能解决你的问题,请参考以下文章