(二十)ATP应用测试平台——websocket实现微服务版在线客服聊天室实战案例
Posted 北溟溟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(二十)ATP应用测试平台——websocket实现微服务版在线客服聊天室实战案例相关的知识,希望对你有一定的参考价值。
前言
在前面的博客内容中我们介绍了如何使用websocket实现一个网页版的在线客服聊天室,众所周知,由于websocket是一个长连接,要和服务端保持会话连接,所以其本身并不适用于微服务环境,在微服务环境中,有可能A、B俩个客户端连接到不同的服务A、B中,这样就没法保证A、B俩个客户端完成聊天的功能,因为会话不在同一台服务器上,A、B无法感知到对方发送的消息,为了解决websocket单机的这个痛点,我们引入消息中间键RocketMQ的广播机制,实现消息的转发,从而实现微服务版的websocke聊天室功能。其架构如下:
本节内容使用的主要技术包含springboot、redis、rocketmq、vue等,关于中间键的搭建本节内容不在展开,请关注作者的往期博客内容。
正文
- 引入websocket、redis和rocketmq的pom依赖
①核心pom依赖
<!-- rocketmq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> <!-- websocket--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- redis--> <dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> <version>2.4.3</version> </dependency>
PS:可以按需引入自己需要的依赖,作者这里只列出核心的pom依赖
-
配置application.yml
①配置文件
server: port: 8888 spring: #数据源配置 datasource: dynamic: primary: master #设置默认的数据源或者数据源组,默认值即为master strict: false #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候会抛出异常,不启动则使用默认数据源. datasource: master: url: jdbc:mysql://192.168.56.10:3306/atp username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置 profiles: active: dev servlet: multipart: max-file-size: 52428800 max-request-size: 52428800 #redis配置 redis: #redisson配置 redisson: file: classpath:redisson.yaml #默认数据分区 database: 0 #redis集群节点配置 cluster: nodes: - 192.168.56.10:6379 - 192.168.56.10:6380 - 192.168.56.10.6381 max-redirects: 3 #超时时间 timeout: 10000 #哨兵节点配置 sentinel: master: mymaster nodes: - "192.168.56.10:26379" - "192.168.56.10:26380" - "192.168.56.10:26381" #redis密码 password: root #redis 客户端工具 lettuce: pool: # 连接池最大连接数(使用负值表示没有限制) 默认为8 max-active: 8 # 连接池中的最小空闲连接 默认为 0 min-idle: 1 # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1 max-wait: 1000 # 连接池中的最大空闲连接 默认为8 max-idle: 8 session: store-type: redis redis: flush-mode: on_save namespace: spring:session:atp thymeleaf: cache: false #mybatisplus配置 mybatis-plus: mapper-locations: classpath*:/mapper/*/*Mapper.xml type-aliases-package: com.yundi.atp.platform.module.*.entity configuration: map-underscore-to-camel-case: true global-config: db-config: id-type: assign_id #rocketmq配置 rocketmq: #注册地址 name-server: 192.168.56.10:9876;192.168.56.10:9877 producer: #生产者组名称 group: atp-producer #命名空间 namespace: atp #异步消息发送失败重试次数,默认是2 retry-times-when-send-async-failed: 2 #发送消息超时时间,默认2000ms send-message-timeout: 2000 #消息的最大长度:默认1024 * 1024 * 4(默认4M) max-message-size: 40000000 #压缩消息阈值,超过4k就压缩 compress-message-body-threshold: 4096 #是否发送失败,重试另外的broker retry-next-server: false #是否启用消息追踪 enable-msg-trace: false #默认追踪的主题 customized-trace-topic: RMQ_SYS_TRACE_TOPIC #消息发送失败重试的次数 retry-times-when-send-failed: 2
-
创建websocket服务配置WebSocketConfig.java
package com.yundi.atp.platform.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig /** * 注入ServerEndpointExporter, * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() return new ServerEndpointExporter();
-
创建微服务版websocket服务
package com.yundi.atp.platform.websocket; import com.alibaba.fastjson.JSON; import com.yundi.atp.platform.common.Constant; import com.yundi.atp.platform.enums.MessageType; import com.yundi.atp.platform.module.test.entity.ChatMsg; import com.yundi.atp.platform.module.test.service.ChatMsgService; import com.yundi.atp.platform.rocketmq.RocketConstant; import com.yundi.atp.platform.rocketmq.RocketProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @Slf4j @Component @ServerEndpoint(Constant.WEBSOCKET_MQ_URL + "userName") public class WebSocketMqServer /** * 会话session */ private Session session; /** * socket连接 */ private static CopyOnWriteArraySet<WebSocketMqServer> webSockets = new CopyOnWriteArraySet<>(); /** * 会话连接池 */ private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); /** * 消息持久化 */ private static ChatMsgService chatMsgService; /** * redis */ private static RedisTemplate redisTemplate; /** * RocketMQ消息工具类 */ private static RocketProducer rocketProducer; @Autowired public void setWebSocketServer(ChatMsgService chatMsgService, RedisTemplate redisTemplate, RocketProducer rocketProducer) WebSocketMqServer.chatMsgService = chatMsgService; WebSocketMqServer.redisTemplate = redisTemplate; WebSocketMqServer.rocketProducer = rocketProducer; @OnOpen public void onOpen(Session session, @PathParam(value = "userName") String userName) //1.将用户添加到在线用户列表中 if (!Constant.SUPER_ADMIN.equals(userName)) redisTemplate.opsForSet().add("online", userName); //2.保存会话连接 this.session = session; webSockets.add(this); sessionPool.put(userName, session); Set online = redisTemplate.opsForSet().members("online"); log.info("【websocket消息】有新的连接,总在线人数为:" + online.size()); //3.创建消息 WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg(); //消息类型 webSocketMqMsg.setKey(MessageType.MESSAGE_OPEN.getCode()); //在线人数 webSocketMqMsg.setOnlineList(online); //全部人数 webSocketMqMsg.setUserList(chatMsgService.getUserList()); //4.消息异步发送到RocketMQ rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg)); @OnClose public void onClose(@PathParam(value = "userName") String userName) //1.更新在线用户列表 redisTemplate.opsForSet().remove("online", userName); //2.清除会话连接 webSockets.remove(this); sessionPool.remove(userName); Set online = redisTemplate.opsForSet().members("online"); log.info("【websocket消息】连接断开,总在线人数为:" + online.size()); //3.创建消息 WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg(); webSocketMqMsg.setKey(MessageType.MESSAGE_CLOSE.getCode()); webSocketMqMsg.setOnlineList(online); webSocketMqMsg.setUserList(chatMsgService.getUserList()); //4.消息异步发送到RocketMQ rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg)); @OnMessage public void onMessage(String message) //1.持久化消息内容 ChatMsg chatMsg = JSON.parseObject(message, ChatMsg.class); chatMsgService.save(chatMsg); //2.创建消息 WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg(); webSocketMqMsg.setKey(MessageType.MESSAGE_SEND.getCode()); webSocketMqMsg.setData(chatMsg); //3.消息异步发送到RocketMQ rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg)); /** * 广播消息 */ public void sendAllMessage(String message) for (WebSocketMqServer webSocket : webSockets) log.info("【websocket消息】广播消息:" + message); try Session session = webSocket.session; if (session != null && session.isOpen()) webSocket.session.getAsyncRemote().sendText(message); catch (Exception e) e.printStackTrace(); /** * 单点消息 * * @param userName * @param message */ public void sendOneMessage(String userName, String message) log.info("【websocket消息】单点消息:" + message); Session session = sessionPool.get(userName); if (session != null && session.isOpen()) try session.getAsyncRemote().sendText(message); catch (Exception e) e.printStackTrace();
ps:这里我们将会话的消息先推送给消息中间键RocketMQ,然后将消息通过广播的形式分发给每一台服务器去消费,如何能消费成功 ,就将消息推送给对应的客户端
- 常量定义
package com.yundi.atp.platform.common; public class Constant /** * zookeeper分布式锁根路径 */ public final static String LOCK_ROOT_PATH = "/zookeeper/lock/"; /** * websocket协议 */ public final static String WEBSOCKET_PROTOCOL = "ws://"; /** * 单机版聊天室 */ public final static String WEBSOCKET_SINGLE_URL = "/websocket/chat/"; /** * 微服务版聊天室 */ public final static String WEBSOCKET_MQ_URL = "/websocket/mq/chat/"; /** * 超级管理员 */ public final static String SUPER_ADMIN = "super_admin";
- 自定义消息类型:根据不同消息内容处理不同的消息业务逻辑
package com.yundi.atp.platform.enums; public enum MessageType MESSAGE_OPEN(1, "开启连接"), MESSAGE_CLOSE(2, "断开连接"), MESSAGE_SEND(3, "发送消息"), MESSAGE_RE_OPEN(4, "异地登录下线通知"); private Integer code; private String msg; MessageType(Integer code, String msg) this.code = code; this.msg = msg; public Integer getCode() return code; public String getMsg() return msg;
- RocketMQ消息发送的工具类
package com.yundi.atp.platform.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component @Slf4j public class RocketProducer @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送同步消息:消息响应后发送下一条消息 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendSyncMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSend(destination, message); log.info("【RocketMQ】发送同步消息:", sendResult); /** * 发送异步消息:异步回调通知消息发送的状况 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendAsyncMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSend(destination, message, new SendCallback() @Override public void onSuccess(SendResult sendResult) log.info("【RocketMQ】发送异步消息:", sendResult); @Override public void onException(Throwable e) log.info("【RocketMQ】发送异步消息异常:", e.getMessage()); ); /** * 发送单向消息:消息发送后无响应,可靠性差,效率高 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendOneWayMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.sendOneWay(destination, message); /** * 同步延迟消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 * @param timeout 发送消息的过期时间 * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSend(destination, message, timeout, delayLevel); log.info("【RocketMQ】发送同步延迟消息:", sendResult); /** * 异步延迟消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 * @param timeout 发送消息的过期时间 * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSend(destination, message, new SendCallback() @Override public void onSuccess(SendResult sendResult) log.info("【RocketMQ】发送异步延迟消息:", sendResult); @Override public void onException(Throwable e) log.info("【RocketMQ】发送异步延迟消息异常:", e.getMessage()); , timeout, delayLevel); /** * 同步顺序消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 */ public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, key); log.info("【RocketMQ】发送同步顺序消息:", sendResult); /** * 异步顺序消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 */ public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSendOrderly(destination, message, key, new SendCallback() @Override public void onSuccess(SendResult sendResult) log.info("【RocketMQ】发送异步顺序消息:", sendResult); @Override public void onException(Throwable e) log.info("【RocketMQ】发送异步顺序消息异常:", e.getMessage()); ); /** * 分布式事务消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 */ public void sendTransactionMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data) .setHeader(RocketMQHeaders.KEYS, key) .setHeader(RocketMQHeaders.TRANSACTION_ID, key) .build(); //主题 String destination = topic + ":" + tag; TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, null); if (transactionSendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && transactionSendResult.getSendStatus().equals(SendStatus.SEND_OK)) log.info("分布式事物消息发送成功"); log.info("分布式事物消息发送结果:", transactionSendResult);
- websocket服务的连接地址获取及历史消息获取
package com.yundi.atp.platform.module.test.controller; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.yundi.atp.platform.common.Result; import com.yundi.atp.platform.module.test.entity.ChatMsg; import com.yundi.atp.platform.module.test.service.ChatMsgService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletRequest; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @Api(tags = "聊天室接口-mq版") @RestController @RequestMapping("/test/mq/chatMsg") public class ChatMsgMqController @Autowired private ChatMsgService chatMsgService; @ApiOperation(value = "获取聊天室地址") @GetMapping(value = "/getWebSocketAddress/username") public Result getWebSocketAddress(HttpServletRequest request, @PathVariable(value = "username") String username) throws UnknownHostException String address = "ws://" + InetAddress.getLocalHost().getHostAddress() + ":" + request.getServerPort() + request.getContextPath() + "/websocket/mq/chat/" + username; return Result.success(address); @ApiOperation(value = "获取历史聊天记录") @GetMapping(value = "/getHistoryChat/username") public Result getWebSocketAddress(@PathVariable(value = "username") String username) List<ChatMsg> list = chatMsgService.list(new QueryWrapper<ChatMsg>() .and(wrapper -> wrapper.eq("sender", username).or().eq("receiver", username)) .orderByDesc("create_time")); List<ChatMsg> collect = list.stream().sorted(Comparator.comparing(ChatMsg::getCreateTime)).collect(Collectors.toList()); return Result.success(collect); @ApiOperation(value = "获取用户列表") @GetMapping(value = "/getUserList") public Result getUserList() List<String> userList = chatMsgService.getUserList(); return Result.success(userList);
- 消息的广播分发
package com.yundi.atp.platform.websocket; import com.alibaba.fastjson.JSON; import com.yundi.atp.platform.common.Constant; import com.yundi.atp.platform.enums.MessageType; import com.yundi.atp.platform.module.test.entity.ChatMsg; import com.yundi.atp.platform.rocketmq.RocketConstant; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(consumerGroup = RocketConstant.ROCKET_CONSUMER_CHAT_GROUP, topic = RocketConstant.ROCKET_TOPIC, selectorExpression = RocketConstant.ROCKET_TAG_CHAT, namespace = RocketConstant.ROCKET_NAMESPACE, messageModel = MessageModel.BROADCASTING) public class WebSocketMqConsumer implements RocketMQListener<String> @Autowired WebSocketMqServer webSocketMqServer; @Override public void onMessage(String message) log.info("聊天室消息:", message); //1.解析消息 WebSocketMqMsg webSocketMqMsg = JSON.parseObject(message, WebSocketMqMsg.class); //2.根据消息类型解析消息 // 建立连接消息 if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_OPEN.getCode())) webSocketMqServer.sendOneMessage(Constant.SUPER_ADMIN, message); // 关闭连接消息 if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_CLOSE.getCode())) webSocketMqServer.sendOneMessage(Constant.SUPER_ADMIN, message); // 发送消息 if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_SEND.getCode())) ChatMsg data = webSocketMqMsg.getData(); webSocketMqServer.sendOneMessage(data.getSender(), message); webSocketMqServer.sendOneMessage(data.getReceiver(), message);
- 消息主题定义
package com.yundi.atp.platform.rocketmq; public class RocketConstant /** * 消费者组 */ public final static String ROCKET_CONSUMER_GROUP = "atp-consumer"; /** * 聊天室消费者组 */ public final static String ROCKET_CONSUMER_CHAT_GROUP = "atp-chat-consumer"; /** * 主题 */ public final static String ROCKET_TOPIC = "atp"; /** * tag */ public final static String ROCKET_TAG = "app"; /** * 聊天室tag */ public final static String ROCKET_TAG_CHAT = "chat"; /** * 名称空间 */ public final static String ROCKET_NAMESPACE = "atp";
- 客户端代码
<template> <div class="container"> <el-card class="box-card"> <div slot="header"> <el-row type="flex"> <el-col :span="1" style="margin: 15px 10px;"> <img alt="ATP客服" src="@/assets/logo.png" style="width:40px;height:40px;"/> </el-col> <el-col :span="3" style="line-height: 74px;margin-left: 10px;"> <span style="display: inline-block;color: white;">ATP客服</span> </el-col> <el-col :span="20" v-if="username==='super_admin'"> <h5 style="color: #83ccd2;padding: 0;text-align: right;margin: 50px 20px 0 0;">当前在线人数: online </h5> </el-col> <el-col :span="20" v-else> <h5 style="color: #83ccd2;padding: 0 0 2px 0;text-align: right;margin: 50px 20px 0 0;font-size: 18px;"> username </h5> </el-col> </el-row> </div> <div class="content" ref="content"> <el-row type="flex"> <el-col :span="6" style="background: #eee;min-height: 600px;" v-if="username==='super_admin'"> <el-tabs v-model="activeName" @tab-click="handleClick" style="width: 190px;margin: 0 2px;"> <el-tab-pane label="在线用户" name="online"> <div v-for="item in friend" :key="item" @click="switchUser(item)" :class="item===active?'mark':''"> <el-badge :is-dot=msgNotify.includes(item) class="item" type="success"> <li style="list-style-type:none;padding: 5px 8px;cursor: pointer;" class="active"> item </li> </el-badge> <el-divider></el-divider> </div> </el-tab-pane> <el-tab-pane label="全部用户" name="all"> <div v-for="item in userList" :key="item" @click="switchUser(item)" :class="item===active?'mark':''"> <el-badge :is-dot=msgNotify.includes(item) class="item" type="success"> <li style="list-style-type:none;padding: 5px 8px;cursor: pointer;" :class="friend.includes(item)?'active':''"> item </li> </el-badge> <el-divider></el-divider> </div> </el-tab-pane> </el-tabs> </el-col> <el-col :span="18" v-if="username==='super_admin'"> <div v-for="item in chatMsgList"> <el-row type="flex" style="margin-bottom: 20px;" v-if="username===item.sender"> <el-col :span="2"> <img alt="ATP客服" src="@/assets/logo.png" style="width:30px;height:30px;margin: 10px 0px 0px 20px;"/> </el-col> <el-col :span="22"> <el-row type="flex" style="margin-top: 10px;margin-left: 5px;opacity: 0.2;"> <el-col :span="7"><span style="padding-left: 20px;"> item.sender </span></el-col> <el-col :span="7"><span> item.createTime | dataFormat('yyyy-MM-dd HH:mm') </span></el-col> </el-row> <el-row> <el-col :span="14" style="margin-left: 8px;margin-top: 5px;"> <el-card style="padding: 8px 5px;"> item.msg </el-card> </el-col> </el-row> </el-col> </el-row> <el-row type="flex" style="margin-bottom: 20px;" v-else justify="end"> <el-col :span="22"> <el-row type="flex" style="margin-top: 10px;margin-right: 5px;opacity: 0.2;" justify="end"> <el-col :span="6"><span> item.sender </span></el-col> <el-col :span="7"><span> item.createTime | dataFormat('yyyy-MM-dd HH:mm') </span></el-col> </el-row> <el-row type="flex" justify="end" style="margin-right: 8px;margin-top: 5px;"> <el-col :span="14" style="margin-right: 8px;"> <el-card style="padding: 8px 5px;"> item.msg </el-card> </el-col> </el-row> </el-col> <el-col :span="2"> <el-avatar shape="square" size="medium" style="float: right;margin: 10px 20px 0px 0px;">客户</el-avatar> </el-col> </el-row> </div> </el-col> <el-col :span="24" v-else> <div v-for="item in chatMsgList"> <el-row type="flex" style="margin-bottom: 20px;" v-if="username===item.sender"> <el-col :span="2"> <el-avatar shape="square" size="medium" style="float: right;margin: 10px 20px 0px 0px;">客户</el-avatar> </el-col> <el-col :span="22"> <el-row type="flex" style="margin-top: 10px;opacity: 0.2;margin-left: 20px;"> <el-col :span="7"><span style="padding-left: 5px;"> item.sender </span></el-col> <el-col :span="7"><span> item.createTime | dataFormat('yyyy-MM-dd HH:mm') </span></el-col> </el-row> <el-row> <el-col :span="14"> <el-card style="padding: 8px 5px;"> item.msg </el-card> </el-col> </el-row> </el-col> </el-row> <el-row type="flex" style="margin-bottom: 20px;" v-else justify="end"> <el-col :span="22"> <el-row type="flex" style="margin-top: 10px;margin-right: 5px;opacity: 0.2;" justify="end"> <el-col :span="6"><span> item.sender </span></el-col> <el-col :span="7"><span> item.createTime | dataFormat('yyyy-MM-dd HH:mm') </span></el-col> </el-row> <el-row type="flex" justify="end" style="margin-top: 5px;"> <el-col :span="14"> <el-card style="padding: 8px 5px;"> item.msg </el-card> </el-col> </el-row> </el-col> <el-col :span="2"> <img alt="ATP客服" src="@/assets/logo.png" style="width:30px;height:30px;margin: 10px 0px 0px 20px;"/> </el-col> </el-row> </div> </el-col> </el-row> </div> <div class="operate" v-if="username==='super_admin'"> <el-input type="textarea" :autosize=" minRows: 3, maxRows: 3" placeholder="您好!这里是ATP客服部,我是客服小美,很高兴为您服务!" v-model="msg"> </el-input> <el-button type="success" size="mini" style="float: right;margin-top: 5px;" @click="sendMsg" :disabled="!(msg && active)"> 发送 </el-button> </div> <div class="operate" v-else> <el-input type="textarea" :autosize=" minRows: 3, maxRows: 3" placeholder="您好!这里是ATP客服部,我是客服小美,很高兴为您服务!" v-model="msg"> </el-input> <el-button type="success" size="mini" style="float: right;margin-top: 5px;" @click="sendMsg" :disabled="!msg"> 发送 </el-button> </div> </el-card> </div> </template> <script> export default name: "ClientMqChat", data() return msg: '', chatMsgList: [], username: sessionStorage.getItem("username"), friend: [], online: 0, active: '', receiver: 'super_admin', userList: [], activeName: 'online', msgNotify:[], , created() this.getWebSocketAddress(); , methods: //tab切换 handleClick(tab, event) const _this = this; if (tab.name === 'online') if (!_this.active) if (_this.online > 0) _this.active = _this.friend[0]; _this.activeName = 'online'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); else if (_this.userList.length > 0) _this.active = _this.userList[0]; _this.activeName = 'all'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); if (tab.name === 'all') if (!_this.active) if (_this.online > 0) _this.active = _this.friend[0]; _this.activeName = 'online'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); else if (_this.userList.length > 0) _this.active = _this.userList[0]; _this.activeName = 'all'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); , //切换用户 switchUser(data) if (this.active === data) return; this.active = data; this.receiver = data; //获取历史聊天记录 this.getHistoryChat(this.receiver); this.msgNotify = this.msgNotify.filter(item => item != this.active); , //获取历史聊天记录 getHistoryChat(data) this.$http.get('/test/mq/chatMsg/getHistoryChat/' + data).then(res => if (res.data.code === 1) this.chatMsgList = res.data.data; this.flushScroll(); else this.$message.warning(res.data.msg); ).catch(error => this.$message.error(error); ); , //获取websocket地址 getWebSocketAddress() this.$http.get('/test/mq/chatMsg/getWebSocketAddress/' + this.username).then(res => if (res.data.code === 1) if ('WebSocket' in window) this.websocket = new WebSocket(res.data.data); this.initWebSocket(); if (this.username != 'super_admin') this.getHistoryChat(this.username); else this.$message.warning('当前浏览器不支持websocket创建!'); else this.$message.warning(res.data.msg); ).catch(error => this.$message.error(error); ); , //初始化websocket initWebSocket() const _this = this; _this.websocket.onerror = function (event) _this.$message.error('服务端连接错误!'); _this.websocket.onopen = function (event) _this.$message.success("连接成功!"); _this.websocket.onmessage = function (event) let res = JSON.parse(event.data); if (res.key === 1) _this.userList = res.userList; _this.friend = res.onlineList; _this.online = _this.friend.length; if (!_this.active) if (_this.online > 0) _this.active = _this.friend[0]; _this.activeName = 'online'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); else if (_this.userList.length > 0) _this.active = _this.userList[0]; _this.activeName = 'all'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); if (res.key === 2) _this.userList = res.userList; _this.friend = res.onlineList; _this.online = _this.friend.length; if (!_this.active) if (_this.online > 0) _this.active = _this.friend[0]; _this.activeName = 'online'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); else if (_this.userList.length > 0) _this.active = _this.userList[0]; _this.activeName = 'all'; _this.receiver = _this.active; _this.getHistoryChat(_this.receiver); if (res.key === 3) if (_this.username === res.data.sender) _this.chatMsgList.push(res.data); _this.flushScroll(); else if (res.data.sender === 'super_admin') _this.chatMsgList.push(res.data); _this.flushScroll(); else if (res.data.sender === _this.active) _this.chatMsgList.push(res.data); _this.flushScroll(); else //发送其它用户处理 _this.msgNotify.push(res.data.sender); _this.websocket.onclose = function (event) _this.$message.warning('服务端已关闭!'); , //发送消息 sendMsg() if (this.msg.trim().length === 0) this.$message.warning('不能发送空消息!'); return; let chatMsg = ; chatMsg.msg = this.msg; chatMsg.sender = this.username; chatMsg.createTime = new Date(); chatMsg.receiver = this.receiver; this.websocket.send(JSON.stringify(chatMsg)); this.msg = ''; this.flushScroll(); , //刷新滚动条 flushScroll() let content = this.$refs.content; setTimeout(() => content.scrollTop = content.scrollHeight; , 100); , </script> <style scoped lang="scss"> .container padding-top: 50px; .box-card margin: auto; width: 800px; height: 800px; max-height: 900px; ::v-deep .el-card__header background: #867ba9 !important; border-bottom: none; padding: 0; ::v-deep .el-card__body padding: 0px !important; position: relative; .content height: 600px; background: #ddd; overflow-y: auto; .el-divider--horizontal margin: 0; .active color: #0080ff; .mark background: #deb068; .item margin-top: 10px; margin-right: 10px; .operate padding: 5px 15px; </style>
- 启动前后端项目,分别使用客服账号和客户账号登录聊天室
- 聊天消息
结语
至此,关于实现微服务的websocket聊天室到这里就结束了,下期见。。。
以上是关于(二十)ATP应用测试平台——websocket实现微服务版在线客服聊天室实战案例的主要内容,如果未能解决你的问题,请参考以下文章
(二十四)ATP应用测试平台——springboot集成fastdfs上传与下载功能
(二十五)ATP应用测试平台——springboot集成knife4j实现API接口文档说明