WebSocket Java 应用

Posted sp42a

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WebSocket Java 应用相关的知识,希望对你有一定的参考价值。

html5 浏览器一方面普及了 WebSocket 的应用,同时我们也感受到 WebSocket 所带来的好处。那么怎么在 Java 中实现 WebSocket 呢?在本文中我们为大家介绍一下,如有不足,敬请提出:)

WebSocket 的特点

WebSocket 的特点就是全双工,不仅浏览器可以发消息给服务端,而且可以反过来,服务器端也能发消息给浏览器,——此为最重要的一点。想想看没有 WebSocket 的日子,服务器端怎么主动发消息给浏览器?客户端轮询?长链接?——都是 Hack 的方法,而且并非服务端自己主动要求发消息给浏览器的。如今,有了 WebSocket,大家就可以互通有无,十分畅快的沟通。

WebSocket 与 Socket 的关系?

抱歉,没有半毛钱的关系哦。

WebSocket 服务端与客户端

毫无疑问,服务端与客户端对应有两种不同的逻辑,我们分别来看看。本文使用同一种语言 Java 去描述服务端、客户端。

WebSocket 服务端

一般较新的 Servlet 规范(Servlet > v3.0)已经支持 WebSocket,Tomcat 里面直接支持,不需要引入其他 jar 包。下面,我们创建一个基类提供基本的 WebSocket 服务端功能。

该源码在这里

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.Session;

import com.ajaxjs.util.logger.LogHelper;
import com.ajaxjs.util.map.JsonHelper;

/**
 * WebSocket 服务端基类
 */
public abstract class BaseWebsocketServer 
	private static final LogHelper LOGGER = LogHelper.getLog(BaseWebsocketServer.class);

	/**
	 * 已连接的客户端
	 */
	protected static final Set<WebSocketEntity> CONNECTIONS = new CopyOnWriteArraySet<>();

	/**
	 * 发送对象给客户端
	 * 
	 * @param obj
	 */
	public void sendMessageJson(Object obj) 
		sendMessage(JsonHelper.toJson(obj));
	

	/**
	 * 发送文本消息给客户端
	 * 
	 * @param msg
	 */
	public void sendMessage(String msg) 
		for (WebSocketEntity clients : CONNECTIONS) 
			clients.sendText(msg);
		
	

	/**
	 * 连接关闭后触发的方法
	 */
	@OnClose
	public void onClose(Session session, CloseReason reason) 
		LOGGER.info("WebSocket 关闭");
		WebSocketEntity toRemove = null;

		for (WebSocketEntity e : CONNECTIONS) 
			if (e.getSession().equals(session)) 
//				LOGGER.info("找到被移除的 ws");
				toRemove = e;
				break;
			
		

		if (toRemove != null)
			CONNECTIONS.remove(toRemove);
	

	/**
	 * 发生错误时触发的方法
	 */
	@OnError
	public void onError(Session session, Throwable e) 
		LOGGER.warning(session.getId() + " 连接发生错误 " + e.getMessage());
		e.printStackTrace();
	

WebSocket 通讯有几种事件,对应不同的 Java 注解(@OnOpen@OnClose 等),添加到 Java 方法上。然而这些方法的参数,如 Session sessionCloseReason reason,不是固定的,可以比较自由地配搭。

静态变量 CONNECTIONS = new CopyOnWriteArraySet<WebSocketEntity>() 是记住已连接的客户端所用。WebSocketEntity 是我们封装的客户端 Bean,当前比较简单,只保存的 session 对象。你可以根据业务增加相应的字段。

/**
 * 已连接的客户端
 */
public class WebSocketEntity 
	private Session session;

	public WebSocketEntity(Session session) 
		this.session = session;
	

	public void sendText(String message) 
		session.getAsyncRemote().sendText(message);
	

	public Session getSession() 
		return session;
	

	public void setSession(Session session) 
		this.session = session;
	

需要客户端 id 标识吗?其实 session.getId() 可返回 id。

这里为什么用 CopyOnWriteArraySet?原来这是一种不需要加锁的多并发机制,原先这点子是参考这博客的, CopyOnWriteArraySet 原理参考这里

子类

有基类自然有子类,下面以一个告警的通知为例:创建一个 WebSocket 服务端类WarningWebSocketServer,并在类前添加@ServerEndpoint(value = "/MessageCenter/warning")注解,该注释端点表示将 WebSocket 服务端运行在 ws://[Server 端 IP 或域名]:[Server 端口]/项目名/MessageCenter/warning 的访问端点。

import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

import com.ajaxjs.net.websocket.BaseWebsocketServer;
import com.ajaxjs.net.websocket.WebSocketEntity;
import com.ajaxjs.util.logger.LogHelper;

@ServerEndpoint("/MessageCenter/warning")
@Component
public class WarningWebSocketServer extends BaseWebsocketServer 
	private static final LogHelper LOGGER = LogHelper.getLog(WarningWebSocketServer.class);

	/**
	 * 连接建立后触发的方法
	 * 
	 * @param session
	 */
	@OnOpen
	public void onOpen(Session session) 
		LOGGER.info("已连接告警 WebSocket");

		CONNECTIONS.add(new WebSocketEntity(session));
	
	
   /**
   * 接收到客户端消息时触发的方法
   */
	@OnMessage
	public void onMessage(String message) 
		LOGGER.info("WebSocket.onMessage: " + message);
	

通过 @Component 注解,我们把 WarningWebSocketServer 作为 Spring 的一个 Component,即 Bean 去调用,当执行 WarningWebSocketServer.sendMessage()/sendMessageJson() 时候就可以给 WebSocket 客户端发送消息。

WebSocket 客户端

WebSocket 客户端源码在这里。创建客户端实例依赖一个 ws://服务端地址,通过 connect(String server) 连接 WebSocket 服务器,然后回调函数中BiConsumer<Session, String> onMessage处理客户端发过来的消息。

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import com.ajaxjs.util.ThreadUtil;
import com.ajaxjs.util.logger.LogHelper;

/**
 * WebSocket 客户端
 * 
 * @author xinzhang
 *
 */
@ClientEndpoint
public class WebSocketClient 
	private static final LogHelper LOGGER = LogHelper.getLog(WebSocketClient.class);

	protected WebSocketContainer container;

	protected Session userSession;

	private String server;

	/**
	 * 创建 WebSocket 客户端
	 */
	public WebSocketClient() 
		container = ContainerProvider.getWebSocketContainer();
	

	/**
	 * 连接 WebSocket 服务器
	 * 
	 * @param server 服务器地址
	 */
	public void connect(String server) 
		this.server = server;
		connect();
	

	/**
	 * 连接 WebSocket 服务器
	 */
	public void connect() 
		try 
			userSession = container.connectToServer(this, new URI(server));
		 catch (DeploymentException | URISyntaxException | IOException e) 
			LOGGER.warning("WS 地址: " + server);
			LOGGER.warning(e);
		
	

	/**
	 * 发送信息
	 * 
	 * @param msg 信息
	 * @throws IOException
	 */
	public void sendMessage(String msg) 
		try 
			userSession.getBasicRemote().sendText(msg);

		 catch (IOException e) 
			LOGGER.warning(e);
		
	

	@OnOpen
	public void onOpen(Session session) 
		LOGGER.info("WebSocket Connected");
		tryReconnect.set(false);
		circlePing();
	

	@OnClose
	public void onClose(Session session, CloseReason closeReason) 
		LOGGER.info("WebSocket 连接断开!");
		
		if (end.get())
			return;

		needReconnect();
	

	private BiConsumer<Session, String> onMessage;

	/**
	 * 有消息推到的时候触发
	 * 
	 * @param session
	 * @param msg
	 */
	@OnMessage
	public void onMessage(Session session, String msg) 
		LOGGER.info(msg);

		if (onMessage != null)
			onMessage.accept(session, msg);
	

	/**
	 * 需要ping标识
	 */
	private AtomicBoolean needPing = new AtomicBoolean(true);

	/**
	 * 尝试重连标识
	 */
	private AtomicBoolean tryReconnect = new AtomicBoolean(false);

	/**
	 * 重连次数
	 */
	private AtomicInteger reConnectTimes = new AtomicInteger(0);

	/**
	 * 连接结束标识
	 */
	private AtomicBoolean end = new AtomicBoolean(false);

	private static ByteBuffer PING_PAYLOAD = null;

	public void circlePing() 
		if (PING_PAYLOAD == null)
			PING_PAYLOAD = ByteBuffer.wrap("Ping".getBytes());

		new Thread(() -> 
			while (needPing.get()) 
				if (userSession != null && userSession.isOpen())
					try 
						userSession.getBasicRemote().sendPing(PING_PAYLOAD);
					 catch (IllegalArgumentException | IOException e) 
						LOGGER.warning(e);
					

				ThreadUtil.sleep(5, TimeUnit.SECONDS);
			

			LOGGER.warning("[]Ping循环关闭");
		).start();
	

	/**
	 * 重新连接
	 */
	private void needReconnect() 
		ThreadUtil.sleep(3);
		int cul = reConnectTimes.incrementAndGet();

		if (cul > 3) 
			disconnect();// close("real stop");
			throw new NullPointerException("服务端断连,3次重连均失败");
		

		LOGGER.warning("[0]第[1]次断开重连", cul);

		if (tryReconnect.get()) 
			LOGGER.warning("第[0]次断开重连结果 -> 连接正在重连,本次重连请求放弃", cul);
			needReconnect();

			return;
		

		try 
			tryReconnect.set(true);

			if (userSession != null && userSession.isOpen()) 
				LOGGER.warning("[第[0]次断开重连,关闭旧连接", cul);
				disconnect();
			

			container = ContainerProvider.getWebSocketContainer();
			connect();
		 catch (Exception exception) 
			LOGGER.warning("[第[0]次断开重连结果 -> 连接正在重连,重连异常:[1]", cul, exception.getMessage());
			needReconnect();
		 finally 
			tryReconnect.set(false);
		
	

	/**
	 * 关闭链接
	 */
	public void disconnect() 
		try 
			userSession.close();
		 catch (IOException e) 
			LOGGER.warning(e);
		
	

	public BiConsumer<Session, String> getOnMessage() 
		return onMessage;
	

	public void setOnMessage(BiConsumer<Session, String> onMessage) 
		this.onMessage = onMessage;
	

发送消息给客户端是执行 sendMessage(String msg)发送信息。这里我们使用了 userSession.getBasicRemote().sendText(msg) 同步的方法,而系统还提供了异步的方法 session.getAsyncRemote().sendText(msg),它们的区别在于:

getAsyncRemote()getBasicRemote()确实是异步与同步的区别,大部分情况下,推荐使用getAsyncRemote()。……由于同步特性,第二行的消息必须等待第一行的发送完成才能进行,而第一行的剩余部分消息要等第二行发送完才能继续发送,所以在第二行会抛出IllegalStateException异常。如果要使用getBasicRemote()同步发送消息,则避免尽量一次发送全部消息,使用部分消息来发送。出处

需要注意的问题

心跳机制

开始时候,发现 WebSocket 每隔一定时间会自动断开连接,搜了很多博客都说设置一下 nginxproxy_read_timeout,的确修改可以解决此问题。但是这个时间过长会影响服务器性能,于是可以改用心跳的机制,告诉服务端此连接一直保持有效,不要断开我。

前端加入心跳的方法参见这里, Java 客户端的参见这里

所谓心跳,就是利用 WebSocket 协议中 Ping 的方法,隔一定时间发消息给服务端,保持住连接。

加上心跳后,如果仍然掉线,那么就要考虑不是 Nginx 问题所导致的,就要考虑具体是什么原因导致断线的。WebSocket 断开的原因有很多,最好在 WebSocket 断开时,将错误打印出来。典型的原因有网络波动,服务端断连的情况,会导致客户端被动断开连接。

@OnClose
public void onClose(Session session, CloseReason reason) 
	LOGGER.info("WebSocket 连接断开!code: 0, reson: 1", reason.getCloseCode(), reason.getReasonPhrase());

具体 code 的含义如下表。


心跳机制的另外一个含义就是告诉双方彼此是否还连接着,所以不但客户端可以对服务端进行心跳,而且反过来,服务端也可以对客户端发心跳。客户端定时发心跳检测,服务端收到心跳检测就回复一个数据包,如果客户端超时未收到回复的心跳包,就可以认为已经离线了。服务端的检测也是类似,只不过服务端可以直接发送 ping 帧,如果超时未收到 pong 帧就可以认为客户端已经断线了。

自动重连

WebSocket 当前貌似没有一种断开自动重新连接的机制,得自己写。其实之前介绍心跳的文章中就包含了自动重连逻辑,特别是 Java 的重连方法,考虑比较周到。

缓冲区问题

WebSocket定时发送 sendPing() 后,还会反复出现接收/发送几个请求就断开连接的情况

原因分析:

无论是作服务端还是客户端,发现每次都是接收到同一个请求的信息后连接就断开了,经过反复的摸索发现,是由于接收到的这个请求传输的数据量过大,超出了 WebSocket 会话接收信息的缓冲区的大小(可使用session.getMaxTextMessageBufferSize()查看缓冲大小,默认为8192),引起的 WebSocket 连接的异常断开。出处

解决方法:重新设置 WebSocket 缓冲区大小,

int maxSize = 200 * 1024;  // 200K
// 可以缓冲的传入二进制消息的最大长度
session.setMaxBinaryMessageBufferSize(maxSize);
// 可以缓冲的传入文本消息的最大长度
session.setMaxTextMessageBufferSize(maxSize);

用线程池解决大批量消息

服务端/客户端接收到客户端/服务端一次性发来的几百条或更多的请求,瞬间都堆积在会话的缓冲区,又没做多线程处理,并且每接收到一条请求还要查询阿里云服务器数据库,加上网络带宽过小,处理一条请求就要花费几十秒;导致线程队列严重堵塞,无法及时响应处理后续的其他请求。

解决方法:使用了线程池开启多条线程同时进行处理

private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

@OnMessage
public void onMessage(String datas,Session session) 
    Runnable t = new Runnable() 
        @Override
        public void run() 
            // 业务代码
        
    ;
    
    //cachedThreadPool.submit(t);
    fixedThreadPool.submit(t);

注意,要给 session 加上同步锁,否则会出现多个线程同时往同一个 session 写数据,导致报错的情况。

public void send(String data) throws Exception 
    synchronized (session) 
        session.getBasicRemote().sendText(data);
    

最后推荐两篇关于 WebSocket 的优秀资源:

以上是关于WebSocket Java 应用的主要内容,如果未能解决你的问题,请参考以下文章

如何在 AWS Elastic Beanstalk 部署的 Java Web 应用程序中启用 WebSocket 请求

Java Socket 和 JS WebSocket

如何使用 SockJs 通过 STOMP Java 客户端验证 Spring 非 Web Websocket?

java中的websocket应用

Java web项目使用webSocket

Java Web高级编程