SpringCloud项目:如何从消息中间件中获取消息

Posted Java知音_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud项目:如何从消息中间件中获取消息相关的知识,希望对你有一定的参考价值。


作者:杨桃桃

blog.csdn.net/yt812100/article/details/111874857

一、外部环境搭建

发送消息到MQ和外部环境的搭建见上一章:SpringCloud项目:实现推送消息到RabbitMQ消息中间件

注:RabbitMQ是安装在虚拟机上的

二、依赖注入

本文不仅导入了上文的amqp依赖坐标还有新的netty依赖坐标

三、编写配置文件(yaml)

和上文一样。不变的是这个。注意端口是5672,路径看rabbitMQ安装在本机还是虚拟机

四、业务层逻辑分析

首先声明本文的业务逻辑。各位读者可能遇到的业务逻辑不一样,所以写法会有些许不同。但是大致还是一样,本文在这先声明本文在处理消息发送时候的业务逻辑

业务场景:在用户已经关注了粉丝的情况下,RabbitMQ中已经有了用户的消息队列。那么我只需要在作者发布文章的时候或者点赞的时候,将存入进队列的消息立刻发送给已经登录的用户即可。

那么业务层的处理首先需要准备一下六个类:

那么接下来就详解每个类的作用。其中业务逻辑复杂的只有监听器类和业务逻辑类。

以下是重点(重重重)

工具类ApplicationContextProvider:返回一些需要的Bean实例以及上下文对象实例(无需改变)

package com.tensquare.notice.config;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    /**
     * 上下文对象实例
     */
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     *
     * @return
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过name获取 Bean.
     *
     * @param name
     * @return
     */
    public Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过class获取Bean.
     *
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     *
     * @param name
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }
}

Nettt服务类NettyServer:实现NIO的传输模式 --固定写法,配置端口以及协议名即可(端口自定义,无需改变)

package com.tensquare.notice.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

public class NettyServer {

    /**
     *  启动netty服务,传递一个端口号
     */
    public void start(int port){
        System.out.println("准备启动Netty......");
        //服务器引导程序
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //用来处理新的连接
        EventLoopGroup boos = new NioEventLoopGroup();
        //用来处理业务逻辑(读写)
        EventLoopGroup worker = new NioEventLoopGroup();
        serverBootstrap.group(boos,worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                       //请求消息解码器
                        ch.pipeline().addLast(new HttpServerCodec());
                        //将多个消息转为单一的request或者response对象
                        ch.pipeline().addLast(new HttpObjectAggregator(65536));
                        //处理websocket的消息事件(websocket服务器协议处理程序)
                        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
                        //创建自己的webscoket处理器,自己用来编写业务逻辑
                        MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
                        ch.pipeline().addLast(myWebSocketHandler);
                    }
                }).bind(port);
    }
}

Netty配置类NettyConfig:NettyConfig是Springcloud项目中的一种配置文件。自动加载。所以会自动开启线程

因此需要configuration注解以及Bean注解

package com.tensquare.notice.config;

import com.tensquare.notice.netty.NettyServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class NettyConfig {

    @Bean
    public NettyServer createNettyServer(){
        NettyServer nettyServer = new NettyServer();
        //启动netty服务,使用新的线程启动
        new Thread(){
            @Override
            public void run(){
                nettyServer.start(1234);
            }

        }.start();
        return nettyServer;
    }

}

消息容器配置类RabbitConfig类:声明出需要的消息容器,(注:与后续的消息监听器相呼应。名称不建议改变)

package com.tensquare.notice.config;

import com.tensquare.notice.listener.SysNoticeListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//配置类
@Configuration
public class RabbitConfig {

    @Bean("sysNoticeContainer")
    public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //使用Channel
        container.setExposeListenerChannel(true);
        //设置自己编写的监听器
        container.setMessageListener(new SysNoticeListener());
        return container;
    }
}

通讯处理类MyWebSocketHandler类:也就是MQ和WebSocket进行交互

重点较多…

  • MyWebSocketHandler是用来进行通讯处理的,也就是MQ和WebSocket进行交互(通讯处理类–核心业务类)

  • MyWebSocketHandler进行业务处理,获取消息数量(业务场景:获取到消息数量即可)

  • MyWebSocketHandler继承SimpleChannelInboundHandler< TextWebSocketFrame>,重写channelRead0(ChannelHandlerContext) 这个参数获取连接,TextWebSocketFrame 这个参数获取页面参数

那么废话不多说。先上代码后上解释:

package com.tensquare.notice.netty;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.config.ApplicationContextProvider;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

//核心业务类,获取MQ的消息
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * 创建对象监听器
     */
    private static ObjectMapper Mapper = new ObjectMapper();
    /**
     * 从Spring容器中获取消息监听器容器,处理订阅消息sysNotice
     */
    SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext().getBean("sysNoticeContainer");
    /**
     * 从spring容器中获取RabbitTemplate
     *
     */
    RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext().getBean(RabbitTemplate.class);
   // @Autowired
   // private RabbitTemplate rabbitTemplate;
    /**
     * 存放WebScoket连接Map,根据用户ID存放
     */
    public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
    /**
     *用户请求服务端,执行的方法
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //约定用户第一次请求携带的数据:{"userid":"1"}
        //获取用户请求数据并解析
        String json = msg.text();
        //解析数据获取用户ID
        String userId = Mapper.readTree(json).get("userId").asText();
        //第一次请求的时候需要建立WebScoket连接
        Channel channel = userChannelMap.get(userId);
        if (channel==null){
            //获取WebScoket连接
            channel  = ctx.channel();
            //把连接放到容器中
            userChannelMap.put(userId,channel);
        }
        //只用完成新消息的提醒即可,只需要获取消息的数量
        //获取RabbitMQ的内容,并且发送给用户
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        //拼接捕获队列的名称
        String queueName = "article_subscribe_"+userId;
        //获取Rabbit的properties容器 (获取rabbit的属性容器)
        Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
        //获取消息数量
        int noticeCount = 0;
        //判断properties是否不为空
        if (queueProperties!=null){
            //如果不为空,获取消息数量
            noticeCount = (int)queueProperties.get("QUEUE_MESSAGE_COUNT");
        }
        //----------------------------------
        //封装返回的数据
        HashMap countMap = new HashMap();
        countMap.put("sysNoticeCount",noticeCount);
        Result result = new Result(true, StatusCode.OK,"查询成功!!",countMap);
        //把数据发送给用户
        channel.writeAndFlush(new TextWebSocketFrame(Mapper.writeValueAsString(result)));
        //把消息从队列里清空,否则MQ消息监听器会再次消费一次
        if (noticeCount>0){
            rabbitAdmin.purgeQueue(queueName,true);
        }
        //为用户的消息队列通知注册监听器,便于用户在线的时候,
        //一旦有新消息,可以主动推送给用户,不需要用户请求服务器获取数据
        sysNoticeContainer.addQueueNames(queueName);
    }
}

接下来就是关于这个类的具体解释了。务必细看。截图都是从刚刚代码中截取的。和我发的源码是一样的

测试参数是自定义的,真实开发环境不会如此

这个其实就是将参数获取到,然后以id为标识将连接存入连接容器的过程

其中有一个Result类可以不用定义,本文是作测试用的所以定义了

通过管家获取到消息的数量

发送消息的代码

那么以上就是关于整个MyWebSocketHandler类的详解。

监听器SysNoticeListener类:判断用户是否在线,发送消息

package com.tensquare.notice.listener;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.netty.MyWebSocketHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import java.util.HashMap;

//消息监听器
public class SysNoticeListener implements ChannelAwareMessageListener {

    private static ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //获取用户id,可以通过队列名称获取
        String queueName = message.getMessageProperties().getConsumerQueue();
        String userId = queueName.substring(queueName.lastIndexOf("_")+1);
        io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
        //判断用户是否在线
        if (wsChannel!=null){
            //如果连接不为空,代表用户在线
            //封装返回数据
            HashMap countMap = new HashMap();
            countMap.put("sysNoticeCount",1);
            Result result = new Result(true, StatusCode.OK,"查询成功",countMap);
            //把数据通过WebScoket连接主动推送给用户
            wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
        }

    }

}

这里与RabbitConfig工具类中相对应

具体作用如注释所说。

测试:这里将一个静态html页面用作测试,加载服务的静态资源里面即可

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>测试 notice 微服务与页面 websocket 交互</title>
</head>
<body>
<h1>
    websocket连接服务器获取mq消息测试
</h1>
<form onSubmit="return false;">
    <table><tr>
        <td><span>服务器地址:</span></td>
        <td><input type="text" id="serverUrl" value="ws://127.0.0.1:1234/ws" /></td>
      </tr>
      <tr>
        <td><input type="button" id="action" value="连接服务器" onClick="connect()" /></td>
        <td><input type="text" id="connStatus" value="未连接 ......" /></td>
    </tr></table>
    <br />
    <hr color="blue" />
    <div>
        <div style="width: 50%;float:left;">
            <div>
                <table><tr>
                    <td><h3>发送给服务端的消息</h3></td>
                    <td><input type="button" value="发送" onClick="send(this.form.message.value)" /></td>
                </tr></table>
            </div>
            <div><textarea type="text" name="message" style="width:500px;height:300px;">
{
    "userId":"1"
}
                </textarea></div>
        </div>
        <div style="width: 50%;float:left;">
            <div><table>
                <tr>
                    <td><h3>服务端返回的应答消息</h3></td>
                </tr>
            </table></div>
            <div><textarea id="responseText" name="responseText" style="width: 500px;height: 300px;" onfocus="this.scrollTop = this.scrollHeight ">
这里显示服务器推送的信息
            </textarea></div>
        </div>
    </div>

</form>

<script type="text/javascript">
    var socket;
    var connStatus = document.getElementById('connStatus');;
    var respText = document.getElementById('responseText');
    var actionBtn = document.getElementById('action');
    var sysCount = 0;
    var userCount = 0;

    function connect() {
        connStatus.value = "正在连接 ......";

        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
        if(window.WebSocket){

            socket = new WebSocket("ws://127.0.0.1:1234/ws");

            socket.onmessage = function(event){
                respText.scrollTop = respText.scrollHeight;
                respText.value += "\\r\\n" + event.data;
                var sysData = JSON.parse(event.data).data.sysNoticeCount;
                if(sysData){
                    sysCount = sysCount + parseInt(sysData)
                }
                var userData = JSON.parse(event.data).data.userNoticeCount;
                if(userData){
                    userCount = userCount + parseInt(sysData)
                }
                respText.value += "\\r\\n现在有" + sysCount + "条订阅新消息";
                respText.value += "\\r\\n现在有" + userCount + "条点赞新消息";
                respText.scrollTop = respText.scrollHeight;
            };
            socket.onopen = function(event){
                respText.value += "\\r\\nWebSocket 已连接";
                respText.scrollTop = respText.scrollHeight;

                connStatus.value = "已连接 O(∩_∩)O";

                actionBtn.value = "断开服务器";
                actionBtn.onclick =function(){
                    disconnect();
                };

            };
            socket.onclose = function(event){
                respText.value += "\\r\\n" + "WebSocket 已关闭";
                respText.scrollTop = respText.scrollHeight;

                connStatus.value = "已断开";

                actionBtn.value = "连接服务器";
                actionBtn.onclick = function() {
                    connect();
                };
            };

        } else {
            //alert("您的浏览器不支持WebSocket协议!");
            connStatus.value = "您的浏览器不支持WebSocket协议!";
        }
    }

    function disconnect() {
        socket.close();
    }

    function send(message){
        if(!window.WebSocket){return;}
        if(socket.readyState == WebSocket.OPEN){
            socket.send(message);
        }else{
            alert("WebSocket 连接没有建立成功!");
        }
    }
</script>
</body>
</html>

端口不需要改变。

下图为测试结果

可以看到,我多发送2条文章,由于关联了一个粉丝,所以又多了2条消息

而消息中间件中消息总数始终为01,因为都以及发送出去了

结语:此篇与上一篇相呼应。有不对的地方望大佬指出。最后,关注Java知音公众号,回复“后端面试”,送你一份面试题宝典!

推荐好文

>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!


以上是关于SpringCloud项目:如何从消息中间件中获取消息的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud项目整合RabbitMQ

SpringCloud - Spring Cloud 之 Stream构建消息驱动微服务框架(十九)

springcloud-消息驱动Stream01

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)

SpringCloud学习之Stream消息驱动默认通道