利用netty开发webScoketClient(支持wss协议,客户端服务端心跳实现)
Posted 张子行的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用netty开发webScoketClient(支持wss协议,客户端服务端心跳实现)相关的知识,希望对你有一定的参考价值。
这里写目录标题
前言
最近在使用 netty这个框架来开发 webScoketClient用来获取一些流式的数据,之后咱家的前端采用长连接和咱保持联系,咱们后端就是一个中转站,既要编写一个webScoketServer供咱家的前端有奶喝,也要编写一个webScoketClient去挤奶,同时为了保证这个奶是澳大利亚纯装牛奶,还需要用巴氏消毒法对奶做一个品质管控。这样咱家的公司才能有希望做大做强,我才有肉吃。而这也是我为什么写下本文的原因
题外话
话说 前端 不是可以通过new WebSocket(url)的方式创建长连接,继而获取流式数据的吗,那为什么前端能干的活要后端来插一脚呢?原因很简单,通过后端来实现webScoketClient可以对比数据做一些落库、过滤、校验…等一系列的操作,可以保证数据的安全性,如果项目后阶段需要版本的迭代,也比较好去扩展需求,当然对于引用一些安全性不是那么高的流式数据,比如说实时的天气信息,那么由前端来实现就够了,我想也没有哪个人吃饱了撑的来攻击这个网站吧,说了这么多下面开始介绍正文webScoketCllient 的几种实现方式吧
webScoketClient实现方式一(jacva_webscoket)
如果是wss请求则添加wss支持,如果是其他请求则正常创建 WebSocketClient
import org.java_websocket.client.DefaultSSLWebSocketClientFactory;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import javax.net.ssl.*;
import java.net.URI;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.Map;
public abstract class WebScoketClientPlus extends WebSocketClient
public WebScoketClientPlus(URI serverURI, Draft draft, Map<String, String> headers, int connecttimeout)
super(serverURI, draft, headers, connecttimeout);
/**
* 如果url包含wss,添加wss协议支持
*/
if (serverURI.toString().contains("wss://"))
trustAllHosts(this);
/**
* 扩展方法:目前不做扩展
*/
public abstract void extendMethod();
public static void trustAllHosts(WebScoketClientPlus appClient)
System.out.println("start...");
TrustManager[] trustAllCerts = new TrustManager[]new X509TrustManager()
public X509Certificate[] getAcceptedIssuers()
return new X509Certificate[0];
public void checkClientTrusted(X509Certificate[] arg0, String arg1)
public void checkServerTrusted(X509Certificate[] arg0, String arg1)
;
try
/**
* 添加wss支持
*/
SSLContext sc = SSLContext.getInstance("TLS");
sc.init((KeyManager[]) null, trustAllCerts, new SecureRandom());
appClient.setWebSocketFactory(new DefaultSSLWebSocketClientFactory(sc));
catch (Exception var3)
var3.printStackTrace();
webScoketClient工具类
通过 WebScoketClientPlus 类(指定webScoket请求头等参数)创建WebSocketClient对象,继而进行连接,当接收到流式数据的时候 onMessage方法将会触发,在此我们可以编写自己的业务逻辑,下文的代码仅仅只是将接收到的 msg 进行打印了一遍而已。注意 onMessage 触发的条件是:方法参数数据类型与要接收数据类型一致,如果不一致可能会造成接收不到数据的问题
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket.Role;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_17;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.stereotype.Component;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Data
@Component
public class WebScoketUtil
public static WebScoketClientPlus getConnect(String url, CountDownLatch countDownLatch) throws URISyntaxException
Map<String, String> headers = new HashMap();
/**
* 指定webScoket的请求头信息
*/
headers.put("Sec-WebSocket-Extensions", "permessage-deflate; client_max_window_bits");
headers.put("Sec-WebSocket-Version", "13");
headers.put("Connection", "Upgrade");
headers.put("Upgrade", "websocket");
headers.put("Accept-Encoding", "gzip, deflate, br");
headers.put("Accept-Language", "zh-CN,zh;q=0.9");
headers.put("Cache-Control", "no-cache");
/**
* 指定角色为客户端
*/
Draft draft = new Draft_17();
draft.setParseMode(Role.CLIENT);
WebScoketClientPlus connect = new WebScoketClientPlus(new URI(url), draft, headers, 10)
@Override
public void extendMethod()
System.err.println("extendMethod");
@SneakyThrows
public void onClose(int arg0, String arg1, boolean arg2)
System.err.println("onClose");
public void onError(Exception arg0)
System.err.println("onError");
public void onMessage(String arg0)
System.out.println("String 消息:" + arg0);
public void onOpen(ServerHandshake arg0)
countDownLatch.countDown();
System.out.println("onOpen:" + arg0);
public void onMessage(ByteBuffer bytes)
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = null;
String received = "";
try
charBuffer = decoder.decode(bytes);
catch (CharacterCodingException e)
e.printStackTrace();
bytes.flip();
received = charBuffer.toString();
System.out.println("接收到的流式数据:" + received);
;
connect.connect();
return connect;
简单编写测试
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws URISyntaxException, InterruptedException
WebScoketClientPlus connect = WebScoketUtil.getConnect("wss://127.0.0.1:5000/v1/api/ws", countDownLatch);
/**
* 阻塞线程,直至onOpen的回调触发代表连接成功,才会放行
*/
countDownLatch.await();
connect.send("\\"userId\\"=1");
webScoketClient实现方式二(netty)
emm用netty实现这个稍微有点复杂。我们通过对 ChannelFuture 对象添加监听器进行监听,如果通道正常开启,则准备握手升级协议(根据url中的scheme确定升级协议的类型,目前支持wss),否则递归的进行重连。如果重连次数达到最大值,将不会进行重连了
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NiosocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.Data;
import lombok.SneakyThrows;
import javax.net.ssl.SSLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 1、通道的开启:发送syn、ack数据包
* 2、三次握手:通道是可靠的,你可以和我交流了
*/
@Data
public class WebSocketClient
private String uri;
private CountDownLatch latch;
private ClientInitializer clientInitializer;
private SslContext sslCtx;
private String host;
private int port;
private String scheme;
private URI websocketURI;
private String type;
private String userId;
private Bootstrap bootstrap;
private Channel channel;
int repeatConnectCount = 0;
public WebSocketClient(String uri, String type, CountDownLatch latch) throws URISyntaxException
this.uri = uri;
this.websocketURI = new URI(uri);
this.host = websocketURI.getHost();
this.port = websocketURI.getPort();
this.scheme = websocketURI.getScheme();
this.latch = latch;
this.type = type;
if ("wss".equals(scheme))
//初始化SslContext,这个在wss协议升级的时候需要用到
try
this.sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
catch (SSLException e)
e.printStackTrace();
else if ("ws".equals(scheme))
this.sslCtx = null;
this.clientInitializer = new ClientInitializer(latch, host, port, sslCtx, type, WebSocketClient.this);
public void connect()
EventLoopGroup group = new NioEventLoopGroup(4);
try
bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024 * 1024 * 10)
.group(group)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(clientInitializer);
doConnect(null,10000);
catch (Exception e)
e.printStackTrace();
@SneakyThrows
protected void doConnect(ChannelHandlerContext ctx, Integer count)
//重连次数每次加一
repeatConnectCount++;
if (repeatConnectCount > count)
if (null != ctx)
System.out.println("通道关闭、重连失败");
ctx.channel().close();
return;
if (channel != null && channel.isActive())
System.err.println("通道正常");
return;
//建立HTTP连接
ChannelFuture future = bootstrap.connect(host, port).addListener(new ChannelFutureListener()
public void operationComplete(ChannelFuture futureListener) throws Exception
if (futureListener.isSuccess())
channel = futureListener.channel();
//连接成功重置重连次数为0
repeatConnectCount = 0;
System.out.println("通道开启成功~");
HttpHeaders httpHeaders = new DefaultHttpHeaders();
WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("websocketHandler");
//升级为ws协议
System.err.println("开始升级http协议~准备开始握手");
webSocketClientHandshaker.handshake(channel);
handler.setHandshaker(webSocketClientHandshaker);
else
futureListener.channel().eventLoop().schedule(new Runnable()
@Override
public void run()
System.err.println("重试开启通道~" + repeatConnectCount);
doConnect(ctx, count);
, 3, TimeUnit.SECONDS);
);
channel = future.channel();
客户端初始化配置
- 开启了心跳事件触发器支持的功能
- 配置了 http 编解码器
- 配置了 handler 处理器
- 开启了 wss 连接支持
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.AllArgsConstructor;
import java.util.concurrent.CountDownLatch;
/**
* netty 客户端初始化
*/
@AllArgsConstructor
public class ClientInitializer extends ChannelInitializer<SocketChannel>
private CountDownLatch latch;
private String host;
private int port;
private SslContext sslCtx;
private String type;
private SimpleChannelInboundHandler handler;
private WebSocketClient webSocketClient;
public ClientInitializer(CountDownLatch latch, String host, int port, SslContext sslCtx, String type, WebSocketClient webSocketClient)
this.latch = latch;
this.host = host;
this.port = port;
this利用netty开发webScoketClient(支持wss协议,客户端服务端心跳实现)