WebSocket Java 应用
Posted sp42a
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WebSocket Java 应用相关的知识,希望对你有一定的参考价值。
html5 浏览器一方面普及了 WebSocket 的应用,同时我们也感受到 WebSocket 所带来的好处。那么怎么在 Java 中实现 WebSocket 呢?在本文中我们为大家介绍一下,如有不足,敬请提出:)
WebSocket 的特点
WebSocket 的特点就是全双工,不仅浏览器可以发消息给服务端,而且可以反过来,服务器端也能发消息给浏览器,——此为最重要的一点。想想看没有 WebSocket 的日子,服务器端怎么主动发消息给浏览器?客户端轮询?长链接?——都是 Hack 的方法,而且并非服务端自己主动要求发消息给浏览器的。如今,有了 WebSocket,大家就可以互通有无,十分畅快的沟通。
WebSocket 与 Socket 的关系?
抱歉,没有半毛钱的关系哦。
WebSocket 服务端与客户端
毫无疑问,服务端与客户端对应有两种不同的逻辑,我们分别来看看。本文使用同一种语言 Java 去描述服务端、客户端。
WebSocket 服务端
一般较新的 Servlet 规范(Servlet > v3.0)已经支持 WebSocket,Tomcat 里面直接支持,不需要引入其他 jar 包。下面,我们创建一个基类提供基本的 WebSocket 服务端功能。
该源码在这里。
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.Session;
import com.ajaxjs.util.logger.LogHelper;
import com.ajaxjs.util.map.JsonHelper;
/**
* WebSocket 服务端基类
*/
public abstract class BaseWebsocketServer
private static final LogHelper LOGGER = LogHelper.getLog(BaseWebsocketServer.class);
/**
* 已连接的客户端
*/
protected static final Set<WebSocketEntity> CONNECTIONS = new CopyOnWriteArraySet<>();
/**
* 发送对象给客户端
*
* @param obj
*/
public void sendMessageJson(Object obj)
sendMessage(JsonHelper.toJson(obj));
/**
* 发送文本消息给客户端
*
* @param msg
*/
public void sendMessage(String msg)
for (WebSocketEntity clients : CONNECTIONS)
clients.sendText(msg);
/**
* 连接关闭后触发的方法
*/
@OnClose
public void onClose(Session session, CloseReason reason)
LOGGER.info("WebSocket 关闭");
WebSocketEntity toRemove = null;
for (WebSocketEntity e : CONNECTIONS)
if (e.getSession().equals(session))
// LOGGER.info("找到被移除的 ws");
toRemove = e;
break;
if (toRemove != null)
CONNECTIONS.remove(toRemove);
/**
* 发生错误时触发的方法
*/
@OnError
public void onError(Session session, Throwable e)
LOGGER.warning(session.getId() + " 连接发生错误 " + e.getMessage());
e.printStackTrace();
WebSocket 通讯有几种事件,对应不同的 Java 注解(@OnOpen
、@OnClose
等),添加到 Java 方法上。然而这些方法的参数,如 Session session
、CloseReason reason
,不是固定的,可以比较自由地配搭。
静态变量 CONNECTIONS = new CopyOnWriteArraySet<WebSocketEntity>()
是记住已连接的客户端所用。WebSocketEntity
是我们封装的客户端 Bean,当前比较简单,只保存的 session 对象。你可以根据业务增加相应的字段。
/**
* 已连接的客户端
*/
public class WebSocketEntity
private Session session;
public WebSocketEntity(Session session)
this.session = session;
public void sendText(String message)
session.getAsyncRemote().sendText(message);
public Session getSession()
return session;
public void setSession(Session session)
this.session = session;
需要客户端 id 标识吗?其实 session.getId()
可返回 id。
这里为什么用 CopyOnWriteArraySet
?原来这是一种不需要加锁的多并发机制,原先这点子是参考这博客的, CopyOnWriteArraySet
原理参考这里。
子类
有基类自然有子类,下面以一个告警的通知为例:创建一个 WebSocket 服务端类WarningWebSocketServer
,并在类前添加@ServerEndpoint(value = "/MessageCenter/warning")
注解,该注释端点表示将 WebSocket 服务端运行在 ws://[Server 端 IP 或域名]:[Server 端口]/项目名/MessageCenter/warning
的访问端点。
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import com.ajaxjs.net.websocket.BaseWebsocketServer;
import com.ajaxjs.net.websocket.WebSocketEntity;
import com.ajaxjs.util.logger.LogHelper;
@ServerEndpoint("/MessageCenter/warning")
@Component
public class WarningWebSocketServer extends BaseWebsocketServer
private static final LogHelper LOGGER = LogHelper.getLog(WarningWebSocketServer.class);
/**
* 连接建立后触发的方法
*
* @param session
*/
@OnOpen
public void onOpen(Session session)
LOGGER.info("已连接告警 WebSocket");
CONNECTIONS.add(new WebSocketEntity(session));
/**
* 接收到客户端消息时触发的方法
*/
@OnMessage
public void onMessage(String message)
LOGGER.info("WebSocket.onMessage: " + message);
通过 @Component
注解,我们把 WarningWebSocketServer 作为 Spring 的一个 Component,即 Bean 去调用,当执行 WarningWebSocketServer.sendMessage()
/sendMessageJson()
时候就可以给 WebSocket 客户端发送消息。
WebSocket 客户端
WebSocket 客户端源码在这里。创建客户端实例依赖一个 ws://服务端地址
,通过 connect(String server)
连接 WebSocket 服务器,然后回调函数中BiConsumer<Session, String> onMessage
处理客户端发过来的消息。
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import com.ajaxjs.util.ThreadUtil;
import com.ajaxjs.util.logger.LogHelper;
/**
* WebSocket 客户端
*
* @author xinzhang
*
*/
@ClientEndpoint
public class WebSocketClient
private static final LogHelper LOGGER = LogHelper.getLog(WebSocketClient.class);
protected WebSocketContainer container;
protected Session userSession;
private String server;
/**
* 创建 WebSocket 客户端
*/
public WebSocketClient()
container = ContainerProvider.getWebSocketContainer();
/**
* 连接 WebSocket 服务器
*
* @param server 服务器地址
*/
public void connect(String server)
this.server = server;
connect();
/**
* 连接 WebSocket 服务器
*/
public void connect()
try
userSession = container.connectToServer(this, new URI(server));
catch (DeploymentException | URISyntaxException | IOException e)
LOGGER.warning("WS 地址: " + server);
LOGGER.warning(e);
/**
* 发送信息
*
* @param msg 信息
* @throws IOException
*/
public void sendMessage(String msg)
try
userSession.getBasicRemote().sendText(msg);
catch (IOException e)
LOGGER.warning(e);
@OnOpen
public void onOpen(Session session)
LOGGER.info("WebSocket Connected");
tryReconnect.set(false);
circlePing();
@OnClose
public void onClose(Session session, CloseReason closeReason)
LOGGER.info("WebSocket 连接断开!");
if (end.get())
return;
needReconnect();
private BiConsumer<Session, String> onMessage;
/**
* 有消息推到的时候触发
*
* @param session
* @param msg
*/
@OnMessage
public void onMessage(Session session, String msg)
LOGGER.info(msg);
if (onMessage != null)
onMessage.accept(session, msg);
/**
* 需要ping标识
*/
private AtomicBoolean needPing = new AtomicBoolean(true);
/**
* 尝试重连标识
*/
private AtomicBoolean tryReconnect = new AtomicBoolean(false);
/**
* 重连次数
*/
private AtomicInteger reConnectTimes = new AtomicInteger(0);
/**
* 连接结束标识
*/
private AtomicBoolean end = new AtomicBoolean(false);
private static ByteBuffer PING_PAYLOAD = null;
public void circlePing()
if (PING_PAYLOAD == null)
PING_PAYLOAD = ByteBuffer.wrap("Ping".getBytes());
new Thread(() ->
while (needPing.get())
if (userSession != null && userSession.isOpen())
try
userSession.getBasicRemote().sendPing(PING_PAYLOAD);
catch (IllegalArgumentException | IOException e)
LOGGER.warning(e);
ThreadUtil.sleep(5, TimeUnit.SECONDS);
LOGGER.warning("[]Ping循环关闭");
).start();
/**
* 重新连接
*/
private void needReconnect()
ThreadUtil.sleep(3);
int cul = reConnectTimes.incrementAndGet();
if (cul > 3)
disconnect();// close("real stop");
throw new NullPointerException("服务端断连,3次重连均失败");
LOGGER.warning("[0]第[1]次断开重连", cul);
if (tryReconnect.get())
LOGGER.warning("第[0]次断开重连结果 -> 连接正在重连,本次重连请求放弃", cul);
needReconnect();
return;
try
tryReconnect.set(true);
if (userSession != null && userSession.isOpen())
LOGGER.warning("[第[0]次断开重连,关闭旧连接", cul);
disconnect();
container = ContainerProvider.getWebSocketContainer();
connect();
catch (Exception exception)
LOGGER.warning("[第[0]次断开重连结果 -> 连接正在重连,重连异常:[1]", cul, exception.getMessage());
needReconnect();
finally
tryReconnect.set(false);
/**
* 关闭链接
*/
public void disconnect()
try
userSession.close();
catch (IOException e)
LOGGER.warning(e);
public BiConsumer<Session, String> getOnMessage()
return onMessage;
public void setOnMessage(BiConsumer<Session, String> onMessage)
this.onMessage = onMessage;
发送消息给客户端是执行 sendMessage(String msg)
发送信息。这里我们使用了 userSession.getBasicRemote().sendText(msg)
同步的方法,而系统还提供了异步的方法 session.getAsyncRemote().sendText(msg)
,它们的区别在于:
getAsyncRemote()
和getBasicRemote()
确实是异步与同步的区别,大部分情况下,推荐使用getAsyncRemote()
。……由于同步特性,第二行的消息必须等待第一行的发送完成才能进行,而第一行的剩余部分消息要等第二行发送完才能继续发送,所以在第二行会抛出IllegalStateException
异常。如果要使用getBasicRemote()
同步发送消息,则避免尽量一次发送全部消息,使用部分消息来发送。出处
需要注意的问题
心跳机制
开始时候,发现 WebSocket 每隔一定时间会自动断开连接,搜了很多博客都说设置一下 nginx的 proxy_read_timeout
,的确修改可以解决此问题。但是这个时间过长会影响服务器性能,于是可以改用心跳的机制,告诉服务端此连接一直保持有效,不要断开我。
所谓心跳,就是利用 WebSocket 协议中 Ping 的方法,隔一定时间发消息给服务端,保持住连接。
加上心跳后,如果仍然掉线,那么就要考虑不是 Nginx 问题所导致的,就要考虑具体是什么原因导致断线的。WebSocket 断开的原因有很多,最好在 WebSocket 断开时,将错误打印出来。典型的原因有网络波动,服务端断连的情况,会导致客户端被动断开连接。
@OnClose
public void onClose(Session session, CloseReason reason)
LOGGER.info("WebSocket 连接断开!code: 0, reson: 1", reason.getCloseCode(), reason.getReasonPhrase());
具体 code 的含义如下表。
心跳机制的另外一个含义就是告诉双方彼此是否还连接着,所以不但客户端可以对服务端进行心跳,而且反过来,服务端也可以对客户端发心跳。客户端定时发心跳检测,服务端收到心跳检测就回复一个数据包,如果客户端超时未收到回复的心跳包,就可以认为已经离线了。服务端的检测也是类似,只不过服务端可以直接发送 ping 帧,如果超时未收到 pong 帧就可以认为客户端已经断线了。
自动重连
WebSocket 当前貌似没有一种断开自动重新连接的机制,得自己写。其实之前介绍心跳的文章中就包含了自动重连逻辑,特别是 Java 的重连方法,考虑比较周到。
缓冲区问题
WebSocket定时发送 sendPing()
后,还会反复出现接收/发送几个请求就断开连接的情况
原因分析:
无论是作服务端还是客户端,发现每次都是接收到同一个请求的信息后连接就断开了,经过反复的摸索发现,是由于接收到的这个请求传输的数据量过大,超出了 WebSocket 会话接收信息的缓冲区的大小(可使用
session.getMaxTextMessageBufferSize()
查看缓冲大小,默认为8192),引起的 WebSocket 连接的异常断开。出处
解决方法:重新设置 WebSocket 缓冲区大小,
int maxSize = 200 * 1024; // 200K
// 可以缓冲的传入二进制消息的最大长度
session.setMaxBinaryMessageBufferSize(maxSize);
// 可以缓冲的传入文本消息的最大长度
session.setMaxTextMessageBufferSize(maxSize);
用线程池解决大批量消息
服务端/客户端接收到客户端/服务端一次性发来的几百条或更多的请求,瞬间都堆积在会话的缓冲区,又没做多线程处理,并且每接收到一条请求还要查询阿里云服务器数据库,加上网络带宽过小,处理一条请求就要花费几十秒;导致线程队列严重堵塞,无法及时响应处理后续的其他请求。
解决方法:使用了线程池开启多条线程同时进行处理
private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
@OnMessage
public void onMessage(String datas,Session session)
Runnable t = new Runnable()
@Override
public void run()
// 业务代码
;
//cachedThreadPool.submit(t);
fixedThreadPool.submit(t);
注意,要给 session 加上同步锁,否则会出现多个线程同时往同一个 session 写数据,导致报错的情况。
public void send(String data) throws Exception
synchronized (session)
session.getBasicRemote().sendText(data);
最后推荐两篇关于 WebSocket 的优秀资源:
- 一文吃透 WebSocket 原理 刚面试完,趁热赶紧整理
- https://stackoverflow.com/questions/55433294/cannot-receive-another-websocket-message-after-interruption-of-thread-with-webso
以上是关于WebSocket Java 应用的主要内容,如果未能解决你的问题,请参考以下文章
如何在 AWS Elastic Beanstalk 部署的 Java Web 应用程序中启用 WebSocket 请求