SpringCloud项目:如何从消息中间件中获取消息
Posted Java知音_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud项目:如何从消息中间件中获取消息相关的知识,希望对你有一定的参考价值。
作者:杨桃桃
blog.csdn.net/yt812100/article/details/111874857
一、外部环境搭建
发送消息到MQ和外部环境的搭建见上一章:SpringCloud项目:实现推送消息到RabbitMQ消息中间件
注:RabbitMQ是安装在虚拟机上的
二、依赖注入
本文不仅导入了上文的amqp依赖坐标还有新的netty依赖坐标
三、编写配置文件(yaml)
和上文一样。不变的是这个。注意端口是5672,路径看rabbitMQ安装在本机还是虚拟机
四、业务层逻辑分析
首先声明本文的业务逻辑。各位读者可能遇到的业务逻辑不一样,所以写法会有些许不同。但是大致还是一样,本文在这先声明本文在处理消息发送时候的业务逻辑
业务场景:在用户已经关注了粉丝的情况下,RabbitMQ中已经有了用户的消息队列。那么我只需要在作者发布文章的时候或者点赞的时候,将存入进队列的消息立刻发送给已经登录的用户即可。
那么业务层的处理首先需要准备一下六个类:
那么接下来就详解每个类的作用。其中业务逻辑复杂的只有监听器类和业务逻辑类。
以下是重点(重重重)
工具类ApplicationContextProvider
:返回一些需要的Bean实例以及上下文对象实例(无需改变)
package com.tensquare.notice.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
/**
* 上下文对象实例
*/
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 获取applicationContext
*
* @return
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通过name获取 Bean.
*
* @param name
* @return
*/
public Object getBean(String name) {
return getApplicationContext().getBean(name);
}
/**
* 通过class获取Bean.
*
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
/**
* 通过name,以及Clazz返回指定的Bean
*
* @param name
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
Nettt服务类NettyServer
:实现NIO的传输模式 --固定写法,配置端口以及协议名即可(端口自定义,无需改变)
package com.tensquare.notice.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class NettyServer {
/**
* 启动netty服务,传递一个端口号
*/
public void start(int port){
System.out.println("准备启动Netty......");
//服务器引导程序
ServerBootstrap serverBootstrap = new ServerBootstrap();
//用来处理新的连接
EventLoopGroup boos = new NioEventLoopGroup();
//用来处理业务逻辑(读写)
EventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boos,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
//请求消息解码器
ch.pipeline().addLast(new HttpServerCodec());
//将多个消息转为单一的request或者response对象
ch.pipeline().addLast(new HttpObjectAggregator(65536));
//处理websocket的消息事件(websocket服务器协议处理程序)
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
//创建自己的webscoket处理器,自己用来编写业务逻辑
MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
ch.pipeline().addLast(myWebSocketHandler);
}
}).bind(port);
}
}
Netty配置类NettyConfig
:NettyConfig是Springcloud项目中的一种配置文件。自动加载。所以会自动开启线程
因此需要configuration注解以及Bean注解
package com.tensquare.notice.config;
import com.tensquare.notice.netty.NettyServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyConfig {
@Bean
public NettyServer createNettyServer(){
NettyServer nettyServer = new NettyServer();
//启动netty服务,使用新的线程启动
new Thread(){
@Override
public void run(){
nettyServer.start(1234);
}
}.start();
return nettyServer;
}
}
消息容器配置类RabbitConfig
类:声明出需要的消息容器,(注:与后续的消息监听器相呼应。名称不建议改变)
package com.tensquare.notice.config;
import com.tensquare.notice.listener.SysNoticeListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置类
@Configuration
public class RabbitConfig {
@Bean("sysNoticeContainer")
public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//使用Channel
container.setExposeListenerChannel(true);
//设置自己编写的监听器
container.setMessageListener(new SysNoticeListener());
return container;
}
}
通讯处理类MyWebSocketHandler
类:也就是MQ和WebSocket进行交互
重点较多…
MyWebSocketHandler
是用来进行通讯处理的,也就是MQ和WebSocket进行交互(通讯处理类–核心业务类)MyWebSocketHandler
进行业务处理,获取消息数量(业务场景:获取到消息数量即可)MyWebSocketHandler
继承SimpleChannelInboundHandler< TextWebSocketFrame>
,重写channelRead0(ChannelHandlerContext)
这个参数获取连接,TextWebSocketFrame
这个参数获取页面参数
那么废话不多说。先上代码后上解释:
package com.tensquare.notice.netty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.config.ApplicationContextProvider;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
//核心业务类,获取MQ的消息
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 创建对象监听器
*/
private static ObjectMapper Mapper = new ObjectMapper();
/**
* 从Spring容器中获取消息监听器容器,处理订阅消息sysNotice
*/
SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext().getBean("sysNoticeContainer");
/**
* 从spring容器中获取RabbitTemplate
*
*/
RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext().getBean(RabbitTemplate.class);
// @Autowired
// private RabbitTemplate rabbitTemplate;
/**
* 存放WebScoket连接Map,根据用户ID存放
*/
public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
/**
*用户请求服务端,执行的方法
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//约定用户第一次请求携带的数据:{"userid":"1"}
//获取用户请求数据并解析
String json = msg.text();
//解析数据获取用户ID
String userId = Mapper.readTree(json).get("userId").asText();
//第一次请求的时候需要建立WebScoket连接
Channel channel = userChannelMap.get(userId);
if (channel==null){
//获取WebScoket连接
channel = ctx.channel();
//把连接放到容器中
userChannelMap.put(userId,channel);
}
//只用完成新消息的提醒即可,只需要获取消息的数量
//获取RabbitMQ的内容,并且发送给用户
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
//拼接捕获队列的名称
String queueName = "article_subscribe_"+userId;
//获取Rabbit的properties容器 (获取rabbit的属性容器)
Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
//获取消息数量
int noticeCount = 0;
//判断properties是否不为空
if (queueProperties!=null){
//如果不为空,获取消息数量
noticeCount = (int)queueProperties.get("QUEUE_MESSAGE_COUNT");
}
//----------------------------------
//封装返回的数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount",noticeCount);
Result result = new Result(true, StatusCode.OK,"查询成功!!",countMap);
//把数据发送给用户
channel.writeAndFlush(new TextWebSocketFrame(Mapper.writeValueAsString(result)));
//把消息从队列里清空,否则MQ消息监听器会再次消费一次
if (noticeCount>0){
rabbitAdmin.purgeQueue(queueName,true);
}
//为用户的消息队列通知注册监听器,便于用户在线的时候,
//一旦有新消息,可以主动推送给用户,不需要用户请求服务器获取数据
sysNoticeContainer.addQueueNames(queueName);
}
}
接下来就是关于这个类的具体解释了。务必细看。截图都是从刚刚代码中截取的。和我发的源码是一样的
测试参数是自定义的,真实开发环境不会如此
这个其实就是将参数获取到,然后以id为标识将连接存入连接容器的过程
其中有一个Result类可以不用定义,本文是作测试用的所以定义了
通过管家获取到消息的数量
发送消息的代码
那么以上就是关于整个MyWebSocketHandler类的详解。
监听器SysNoticeListener
类:判断用户是否在线,发送消息
package com.tensquare.notice.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.netty.MyWebSocketHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import java.util.HashMap;
//消息监听器
public class SysNoticeListener implements ChannelAwareMessageListener {
private static ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取用户id,可以通过队列名称获取
String queueName = message.getMessageProperties().getConsumerQueue();
String userId = queueName.substring(queueName.lastIndexOf("_")+1);
io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
//判断用户是否在线
if (wsChannel!=null){
//如果连接不为空,代表用户在线
//封装返回数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount",1);
Result result = new Result(true, StatusCode.OK,"查询成功",countMap);
//把数据通过WebScoket连接主动推送给用户
wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
}
}
}
这里与RabbitConfig工具类中相对应
具体作用如注释所说。
测试:这里将一个静态html页面用作测试,加载服务的静态资源里面即可
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>测试 notice 微服务与页面 websocket 交互</title>
</head>
<body>
<h1>
websocket连接服务器获取mq消息测试
</h1>
<form onSubmit="return false;">
<table><tr>
<td><span>服务器地址:</span></td>
<td><input type="text" id="serverUrl" value="ws://127.0.0.1:1234/ws" /></td>
</tr>
<tr>
<td><input type="button" id="action" value="连接服务器" onClick="connect()" /></td>
<td><input type="text" id="connStatus" value="未连接 ......" /></td>
</tr></table>
<br />
<hr color="blue" />
<div>
<div style="width: 50%;float:left;">
<div>
<table><tr>
<td><h3>发送给服务端的消息</h3></td>
<td><input type="button" value="发送" onClick="send(this.form.message.value)" /></td>
</tr></table>
</div>
<div><textarea type="text" name="message" style="width:500px;height:300px;">
{
"userId":"1"
}
</textarea></div>
</div>
<div style="width: 50%;float:left;">
<div><table>
<tr>
<td><h3>服务端返回的应答消息</h3></td>
</tr>
</table></div>
<div><textarea id="responseText" name="responseText" style="width: 500px;height: 300px;" onfocus="this.scrollTop = this.scrollHeight ">
这里显示服务器推送的信息
</textarea></div>
</div>
</div>
</form>
<script type="text/javascript">
var socket;
var connStatus = document.getElementById('connStatus');;
var respText = document.getElementById('responseText');
var actionBtn = document.getElementById('action');
var sysCount = 0;
var userCount = 0;
function connect() {
connStatus.value = "正在连接 ......";
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket){
socket = new WebSocket("ws://127.0.0.1:1234/ws");
socket.onmessage = function(event){
respText.scrollTop = respText.scrollHeight;
respText.value += "\\r\\n" + event.data;
var sysData = JSON.parse(event.data).data.sysNoticeCount;
if(sysData){
sysCount = sysCount + parseInt(sysData)
}
var userData = JSON.parse(event.data).data.userNoticeCount;
if(userData){
userCount = userCount + parseInt(sysData)
}
respText.value += "\\r\\n现在有" + sysCount + "条订阅新消息";
respText.value += "\\r\\n现在有" + userCount + "条点赞新消息";
respText.scrollTop = respText.scrollHeight;
};
socket.onopen = function(event){
respText.value += "\\r\\nWebSocket 已连接";
respText.scrollTop = respText.scrollHeight;
connStatus.value = "已连接 O(∩_∩)O";
actionBtn.value = "断开服务器";
actionBtn.onclick =function(){
disconnect();
};
};
socket.onclose = function(event){
respText.value += "\\r\\n" + "WebSocket 已关闭";
respText.scrollTop = respText.scrollHeight;
connStatus.value = "已断开";
actionBtn.value = "连接服务器";
actionBtn.onclick = function() {
connect();
};
};
} else {
//alert("您的浏览器不支持WebSocket协议!");
connStatus.value = "您的浏览器不支持WebSocket协议!";
}
}
function disconnect() {
socket.close();
}
function send(message){
if(!window.WebSocket){return;}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket 连接没有建立成功!");
}
}
</script>
</body>
</html>
端口不需要改变。
下图为测试结果
可以看到,我多发送2条文章,由于关联了一个粉丝,所以又多了2条消息
而消息中间件中消息总数始终为01,因为都以及发送出去了
结语:此篇与上一篇相呼应。有不对的地方望大佬指出。最后,关注Java知音公众号,回复“后端面试”,送你一份面试题宝典!
推荐好文
>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!
以上是关于SpringCloud项目:如何从消息中间件中获取消息的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud - Spring Cloud 之 Stream构建消息驱动微服务框架(十九)
(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream