netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响
Posted dgutfly
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响相关的知识,希望对你有一定的参考价值。
进行这项实验之前,先读了xbmchina的简书文章,感谢这位大神提供的关于channelPipeline和channelHandler文章:
【Netty】ChannelPipeline和ChannelHandler(一)
【Netty】ChannelHandler的添加和删除(二)
【Netty】inBound和outBound事件的传播过程
之前想以leonzm的websocket_demo项目为基础,写netty4版本的聊天室,但是发现netty4的函数不一样,messageReceived(建立链接/接收数据包)和close(断开链接)不能覆写,研究了下handler的生命周期。知道channelRead0可以建立链接,并接收已建立链接的客户端的数据包;当隧道处于channelInactived阶段时,表明数据隧道(链接)要断开了,就要进入channelUnregistered阶段,这时就可以在上面执行链接相关数据清除工作;隧道的处理器ChannelHandler也有生命周期,handlerRemoved时也可以执行类似操作。
netty的inbound和outbound的区别:除了inbound事件为被动触发,在某些情况发生时自动触发,outbound为主动触发,在需要主动执行某些操作时触发以外,outBound单独用不能接收到websocket客户端的信息(这是向外主动发信息的handler,接收信息要inbound来),outBound这个跟适合在pipeline流水线上嵌入,做AOP(切面编程)。
开始执行channelPipeline流水线程序比较:
Lanucher.java:(开启netty服务的主函数)
1 package com.company.lanucher; 2 3 import com.company.server.ReversedWebSocketServer; 4 import com.company.server.WebSocketServer; 5 6 public class Lanucher 7 8 public static void main(String[] args) throws Exception 9 // 启动WebSocket,如果想开启另一个服务器,注释掉Reversed,再解除WebSocketServer的注释即可 10 //new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT); 11 new ReversedWebSocketServer().run(ReversedWebSocketServer.WEBSOCKET_PORT); 12 13 14
WebSocketServer.java:(流水线先执行inBoundHandler再执行OutBoundAdapter)
1 package com.company.server; 2 3 import org.apache.log4j.Logger; 4 5 import io.netty.bootstrap.ServerBootstrap; 6 import io.netty.channel.Channel; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelPipeline; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.nio.NioserverSocketChannel; 12 import io.netty.handler.codec.http.HttpObjectAggregator; 13 import io.netty.handler.codec.http.HttpServerCodec; 14 import io.netty.handler.stream.ChunkedWriteHandler; 15 16 /** 17 * WebSocket服务 18 * 19 */ 20 public class WebSocketServer 21 private static final Logger LOG = Logger.getLogger(WebSocketServer.class); 22 23 // websocket端口 24 public static final int WEBSOCKET_PORT = 9090; 25 26 public void run(int port) throws Exception 27 EventLoopGroup bossGroup = new NioEventLoopGroup(); 28 EventLoopGroup workerGroup = new NioEventLoopGroup(); 29 try 30 ServerBootstrap b = new ServerBootstrap(); 31 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() 32 33 @Override 34 protected void initChannel(Channel channel) throws Exception 35 ChannelPipeline pipeline = channel.pipeline(); 36 pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码 37 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装 38 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持 39 pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler的前置拦截器 40 pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler 41 42 ); 43 44 Channel channel = b.bind(port).sync().channel(); 45 LOG.info("WebSocket 已经启动,端口:" + port + "."); 46 channel.closeFuture().sync(); 47 finally 48 bossGroup.shutdownGracefully(); 49 workerGroup.shutdownGracefully(); 50 51 52
ReversedWebSocketServer.java:(流水线先执行OutBoundAdapter再执行inBoundHandler)
1 package com.company.server; 2 3 import org.apache.log4j.Logger; 4 5 import io.netty.bootstrap.ServerBootstrap; 6 import io.netty.channel.Channel; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelPipeline; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.nio.NioServerSocketChannel; 12 import io.netty.handler.codec.http.HttpObjectAggregator; 13 import io.netty.handler.codec.http.HttpServerCodec; 14 import io.netty.handler.stream.ChunkedWriteHandler; 15 16 public class ReversedWebSocketServer 17 private static final Logger LOG = Logger.getLogger(WebSocketServer.class); 18 19 // websocket端口 20 public static final int WEBSOCKET_PORT = 9090; 21 public static final int FUN_WEBSOCKET_PORT = 9091; 22 23 public void run(int port) throws Exception 24 EventLoopGroup bossGroup = new NioEventLoopGroup(); 25 EventLoopGroup workerGroup = new NioEventLoopGroup(); 26 try 27 ServerBootstrap b = new ServerBootstrap(); 28 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() 29 30 @Override 31 protected void initChannel(Channel channel) throws Exception 32 ChannelPipeline pipeline = channel.pipeline(); 33 pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码 34 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装 35 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持 36 pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler 37 pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler 38 39 ); 40 41 Channel channel = b.bind(port).sync().channel(); 42 LOG.info("WebSocket 已经启动,端口:" + port + "."); 43 channel.closeFuture().sync(); 44 finally 45 bossGroup.shutdownGracefully(); 46 workerGroup.shutdownGracefully(); 47 48 49 50
BananaWebSocketServerHandler.java:(inBoundHandler,处理从客户端接收的请求)
1 package com.company.server; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelFutureListener; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelPromise; 9 import io.netty.channel.SimpleChannelInboundHandler; 10 import io.netty.handler.codec.http.DefaultFullHttpResponse; 11 import io.netty.handler.codec.http.FullHttpRequest; 12 import io.netty.handler.codec.http.FullHttpResponse; 13 import io.netty.handler.codec.http.HttpHeaders; 14 import io.netty.handler.codec.http.HttpResponseStatus; 15 import io.netty.handler.codec.http.HttpVersion; 16 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 17 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; 18 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; 19 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 20 import io.netty.handler.codec.http.websocketx.WebSocketFrame; 21 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; 22 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; 23 import io.netty.util.CharsetUtil; 24 25 import org.apache.log4j.Logger; 26 27 import com.company.serviceimpl.BananaService; 28 import com.company.util.BufToString; 29 import com.company.util.CODE; 30 import com.company.util.Request; 31 import com.company.util.Response; 32 import com.google.common.base.Strings; 33 import com.google.gson.JsonSyntaxException; 34 35 36 /** 37 * WebSocket服务端Handler 38 * 39 */ 40 public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> 41 private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName()); 42 43 private WebSocketServerHandshaker handshaker; 44 private ChannelHandlerContext ctx; 45 private String sessionId; 46 private boolean isLog = true; 47 48 public BananaWebSocketServerHandler() 49 super(); 50 51 52 public BananaWebSocketServerHandler(boolean isLog) 53 this(); 54 this.isLog = isLog; 55 56 57 //netty 5的覆写函数,netty4中用channelRead0代替 58 public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception 59 if(this.isLog) 60 System.out.print("channel MessageReceived = = " + ctx.name()); 61 62 if (msg instanceof FullHttpRequest) // 传统的HTTP接入 63 FullHttpRequest mymsg = (FullHttpRequest) msg; 64 System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content())); 65 handleHttpRequest(ctx, mymsg); 66 else if (msg instanceof WebSocketFrame) // WebSocket接入 67 WebSocketFrame mymsg = (WebSocketFrame) msg; 68 System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content())); 69 handleWebSocketFrame(ctx, mymsg); 70 71 72 73 @Override 74 public void handlerAdded(ChannelHandlerContext ctx) throws Exception 75 System.out.println("channel handlerAdded = = " + ctx.name()); 76 77 78 @Override 79 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 80 System.out.println("channel handlerRemoved = = " + ctx.name()); 81 82 83 @Override 84 protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception 85 if(this.isLog) 86 System.out.print("channel Read0 = = " + ctx.name()); 87 88 if (msg instanceof FullHttpRequest) // 传统的HTTP接入 89 FullHttpRequest mymsg = (FullHttpRequest) msg; 90 System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content())); 91 handleHttpRequest(ctx, mymsg); 92 else if (msg instanceof WebSocketFrame) // WebSocket接入 93 WebSocketFrame mymsg = (WebSocketFrame) msg; 94 System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content())); 95 handleWebSocketFrame(ctx, mymsg); 96 97 98 99 @Override 100 public void channelInactive(ChannelHandlerContext ctx) 101 if(this.isLog) 102 System.out.println("channel Inactive = = " + ctx.name()); 103 104 try 105 this.close(ctx, null); 106 catch (Exception e) 107 e.printStackTrace(); 108 109 110 111 @Override 112 public void channelUnregistered(ChannelHandlerContext ctx) 113 if(this.isLog) 114 System.out.println("channel Unregistered = = " + ctx.name()); 115 116 117 118 @Override 119 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception 120 ctx.flush(); 121 System.out.println("channel Flush = = " + ctx.name()); 122 123 124 @Override 125 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 126 127 ctx.close(); 128 if(this.isLog) 129 System.err.println("channel exceptionCaught = = " + ctx.name()); 130 cause.printStackTrace(); 131 132 BananaService.logout(sessionId); // 注销 133 BananaService.notifyDownline(sessionId); // 通知有人下线 134 135 136 //netty 5的覆写函数,netty4中用channelInactive代替 137 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception 138 if(this.isLog) 139 System.out.println("channel close = = " + ctx.name()); 140 141 BananaService.logout(sessionId); // 注销 142 BananaService.notifyDownline(sessionId); // 通知有人下线 143 ctx.close(); 144 145 146 /** 147 * 处理Http请求,完成WebSocket握手<br/> 148 * 注意:WebSocket连接第一次请求使用的是Http 149 * @param ctx 150 * @param request 151 * @throws Exception 152 */ 153 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception 154 // 如果HTTP解码失败,返回HHTP异常 155 if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) 156 sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); 157 return; 158 159 160 // 正常WebSocket的Http连接请求,构造握手响应返回 161 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false); 162 handshaker = wsFactory.newHandshaker(request); 163 if (handshaker == null) // 无法处理的websocket版本 164 WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); 165 else // 向客户端发送websocket握手,完成握手 166 handshaker.handshake(ctx.channel(), request); 167 // 记录管道处理上下文,便于服务器推送数据到客户端 168 this.ctx = ctx; 169 170 171 172 /** 173 * 处理Socket请求 174 * @param ctx 175 * @param frame 176 * @throws Exception 177 */ 178 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception 179 // 判断是否是关闭链路的指令 180 if (frame instanceof CloseWebSocketFrame) 181 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); 182 return; 183 184 // 判断是否是Ping消息 185 if (frame instanceof PingWebSocketFrame) 186 ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); 187 return; 188 189 // 当前只支持文本消息,不支持二进制消息 190 if (!(frame instanceof TextWebSocketFrame)) 191 throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息"); 192 193 194 // 处理来自客户端的WebSocket请求 195 try 196 /* 197 if(this.isLog) 198 System.out.println("handleWebSocketFrame-=-=-" + ((TextWebSocketFrame)frame).text()); 199 200 */ 201 Request request = Request.create(((TextWebSocketFrame)frame).text()); 202 Response response = new Response(); 203 response.setServiceId(request.getServiceId()); 204 if (CODE.online.code.intValue() == request.getServiceId()) // 客户端注册 205 String requestId = request.getRequestId(); 206 if (Strings.isNullOrEmpty(requestId)) 207 response.setIsSucc(false).setMessage("requestId不能为空"); 208 return; 209 else if (Strings.isNullOrEmpty(request.getName())) 210 response.setIsSucc(false).setMessage("name不能为空"); 211 return; 212 else if (BananaService.bananaWatchMap.containsKey(requestId)) 213 response.setIsSucc(false).setMessage("您已经注册了,不能重复注册"); 214 return; 215 216 if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) 217 response.setIsSucc(false).setMessage("注册失败"); 218 else 219 response.setIsSucc(true).setMessage("注册成功"); 220 221 BananaService.bananaWatchMap.forEach((reqId, callBack) -> 222 response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 将已经上线的人员返回 223 224 if (!reqId.equals(requestId)) 225 Request serviceRequest = new Request(); 226 serviceRequest.setServiceId(CODE.online.code); 227 serviceRequest.setRequestId(requestId); 228 serviceRequest.setName(request.getName()); 229 try 230 callBack.send(serviceRequest); // 通知有人上线 231 catch (Exception e) 232 LOG.warn("回调发送消息给客户端异常", e); 233 234 235 ); 236 237 sendWebSocket(response.toJson()); 238 this.sessionId = requestId; // 记录会话id,当页面刷新或浏览器关闭时,注销掉此链路 239 else if (CODE.send_message.code.intValue() == request.getServiceId()) // 客户端发送消息到聊天群 240 String requestId = request.getRequestId(); 241 if (Strings.isNullOrEmpty(requestId)) 242 response.setIsSucc(false).setMessage("requestId不能为空"); 243 else if (Strings.isNullOrEmpty(request.getName())) 244 response.setIsSucc(false).setMessage("name不能为空"); 245 else if (Strings.isNullOrEmpty(request.getMessage())) 246 response.setIsSucc(false).setMessage("message不能为空"); 247 else 248 response.setIsSucc(true).setMessage("发送消息成功"); 249 250 BananaService.bananaWatchMap.forEach((reqId, callBack) -> // 将消息发送到所有机器 251 Request serviceRequest = new Request(); 252 serviceRequest.setServiceId(CODE.receive_message.code); 253 serviceRequest.setRequestId(requestId); 254 serviceRequest.setName(request.getName()); 255 serviceRequest.setMessage(request.getMessage()); 256 try 257 callBack.send(serviceRequest); 258 catch (Exception e) 259 LOG.warn("回调发送消息给客户端异常", e); 260 261 ); 262 263 sendWebSocket(response.toJson()); 264 else if (CODE.downline.code.intValue() == request.getServiceId()) // 客户端下线 265 String requestId = request.getRequestId(); 266 if (Strings.isNullOrEmpty(requestId)) 267 sendWebSocket(response.setIsSucc(false).setMessage("requestId不能为空").toJson()); 268 else 269 BananaService.logout(requestId); 270 response.setIsSucc(true).setMessage("下线成功"); 271 272 BananaService.notifyDownline(requestId); // 通知有人下线 273 274 sendWebSocket(response.toJson()); 275 276 277 else 278 sendWebSocket(response.setIsSucc(false).setMessage("未知请求").toJson()); 279 280 catch (JsonSyntaxException e1) 281 LOG.warn("Json解析异常", e1); 282 System.err.println("Json解析异常"); 283 e1.printStackTrace(); 284 catch (Exception e2) 285 LOG.error("处理Socket请求异常", e2); 286 System.err.println("处理Socket请求异常"); 287 e2.printStackTrace(); 288 289 290 291 /** 292 * Http返回 293 * @param ctx 294 * @param request 295 * @param response 296 */ 297 private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) 298 // 返回应答给客户端 299 if (response.getStatus().code() != 200) 300 ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8); 301 response.content().writeBytes(buf); 302 buf.release(); 303 HttpHeaders.setContentLength(response, response.content().readableBytes()); 304 305 306 // 如果是非Keep-Alive,关闭连接 307 ChannelFuture f = ctx.channel().writeAndFlush(response); 308 if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) 309 f.addListener(ChannelFutureListener.CLOSE); 310 311 312 313 /** 314 * WebSocket返回 315 * @param ctx 316 * @param req 317 * @param res 318 */ 319 public void sendWebSocket(String msg) throws Exception 320 if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) 321 throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息"); 322 323 this.ctx.channel().write(new TextWebSocketFrame(msg)); 324 this.ctx.flush(); 325 326 327
FunWebSocketServerHandler.java:(outBoundAdapter,处理从服务器发出的响应)
1 package com.company.server; 2 3 import java.net.SocketAddress; 4 5 import com.company.util.BufToString; 6 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelOutboundHandlerAdapter; 9 import io.netty.channel.ChannelPromise; 10 import io.netty.handler.codec.http.DefaultFullHttpResponse; 11 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 13 14 public class FunWebSocketServerHandler extends ChannelOutboundHandlerAdapter 15 16 @Override 17 public void read(ChannelHandlerContext ctx) throws Exception 18 ChannelHandlerContext readRes = ctx.read(); 19 System.out.println(ctx.name() + " is read in " + readRes.toString()); 20 21 22 @Override 23 public void handlerAdded(ChannelHandlerContext ctx) throws Exception 24 System.out.println(ctx.name() + " handlerAdded = = " + ctx.name()); 25 26 27 @Override 28 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 29 System.out.println(ctx.name() + " handlerRemoved = = " + ctx.name()); 30 31 32 @Override 33 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, 34 ChannelPromise promise) throws Exception 35 ctx.bind(localAddress, promise); 36 System.out.println(ctx.name() + " is bind in " + localAddress.toString() + " in " + promise.toString()); 37 38 @Override 39 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, 40 SocketAddress localAddress, ChannelPromise promise) throws Exception 41 ctx.connect(remoteAddress, localAddress, promise); 42 System.out.println(ctx.name() + " is connect in " + localAddress.toString() + " in client " + remoteAddress.toString() + " in " + promise.toString()); 43 44 @Override 45 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) 46 throws Exception 47 ctx.disconnect(promise); 48 System.out.println(ctx.name() + " is disconnect in " + promise.toString()); 49 50 @Override 51 public void close(ChannelHandlerContext ctx, ChannelPromise promise) 52 throws Exception 53 ctx.close(promise); 54 System.out.println(ctx.name() + " is close in " + promise.toString()); 55 56 @Override 57 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception 58 ctx.deregister(promise); 59 System.out.println(ctx.name() + " is deregister in " + promise.toString()); 60 61 62 @Override 63 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 64 ctx.write(msg, promise); 65 System.out.print(ctx.name() + " is write in " + promise.toString()); 66 if(msg instanceof DefaultFullHttpResponse) 67 System.out.println(" with message : " + BufToString.convertByteBufToString(((DefaultFullHttpResponse)msg).content())); 68 69 else if(msg instanceof TextWebSocketFrame) 70 System.out.println(" with socket message : " + ((TextWebSocketFrame)msg).text()); 71 72 else if(msg instanceof CloseWebSocketFrame) 73 System.out.println(" close reason : " + ((CloseWebSocketFrame)msg).reasonText()); 74 75 else 76 System.out.println(" with message : " + msg.getClass()); 77 78 79 @Override 80 public void flush(ChannelHandlerContext ctx) throws Exception 81 ctx.flush(); 82 System.out.println(ctx.name() + " is flush"); 83 84
banana.html:(聊天室前端)
1 <!DOCTYPE html> 2 <html> 3 <head> 4 <meta charset="UTF-8"> 5 <title>Netty WebSocket 聊天实例</title> 6 </head> 7 <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" type="text/javascript"></script> 8 <script src="map.js" type="text/javascript"></script> 9 <script type="text/javascript"> 10 $(document).ready(function() 11 var uuid = guid(); // uuid在一个会话唯一 12 var nameOnline = ‘‘; // 上线姓名 13 var onlineName = new Map(); // 已上线人员, <requestId, name> 14 15 $("#name").attr("disabled","disabled"); 16 $("#onlineBtn").attr("disabled","disabled"); 17 $("#downlineBtn").attr("disabled","disabled"); 18 19 $("#banana").hide(); 20 21 // 初始化websocket 22 var socket; 23 if (!window.WebSocket) 24 window.WebSocket = window.MozWebSocket; 25 26 if (window.WebSocket) 27 socket = new WebSocket("ws://localhost:9090/"); 28 socket.onmessage = function(event) 29 console.log("收到服务器消息:" + event.data); 30 if (event.data.indexOf("isSucc") != -1) // 这里需要判断是客户端请求服务端返回后的消息(response) 31 var response = JSON.parse(event.data); 32 if (response != undefined && response != null) 33 if (response.serviceId == 1001) // 上线 34 if (response.isSucc) 35 // 上线成功,初始化已上线人员 36 onlineName.clear(); 37 $("#showOnlineNames").empty(); 38 for (var reqId in response.hadOnline) 39 onlineName.put(reqId, response.hadOnline[reqId]); 40 41 initOnline(); 42 43 $("#name").attr("disabled","disabled"); 44 $("#onlineBtn").attr("disabled","disabled"); 45 $("#downlineBtn").removeAttr("disabled"); 46 $("#banana").show(); 47 else 48 alert("上线失败"); 49 50 else if (response.serviceId == 1004) 51 if (response.isSucc) 52 onlineName.clear(); 53 $("#showBanana").empty(); 54 $("#showOnlineNames").empty(); 55 $("#name").removeAttr("disabled"); 56 $("#onlineBtn").removeAttr("disabled"); 57 $("#downlineBtn").attr("disabled","disabled"); 58 $("#banana").hide(); 59 else 60 alert("下线失败"); 61 62 63 64 else // 还是服务端向客户端的请求(request) 65 var request = JSON.parse(event.data); 66 if (request != undefined && request != null) 67 if (request.serviceId == 1001 || request.serviceId == 1004) // 有人上线/下线 68 if (request.serviceId == 1001) 69 onlineName.put(request.requestId, request.name); 70 71 if (request.serviceId == 1004) 72 onlineName.removeByKey(request.requestId); 73 74 75 initOnline(); 76 else if (request.serviceId == 1003) // 有人发消息 77 appendBanana(request.name, request.message); 78 79 80 81 ; 82 socket.onopen = function(event) 83 $("#name").removeAttr("disabled"); 84 $("#onlineBtn").removeAttr("disabled"); 85 console.log("已连接服务器"); 86 ; 87 socket.onclose = function(event) // WebSocket 关闭 88 console.log("WebSocket已经关闭!"); 89 ; 90 socket.onerror = function(event) 91 console.log("WebSocket异常!"); 92 ; 93 else 94 alert("抱歉,您的浏览器不支持WebSocket协议!"); 95 96 97 // WebSocket发送请求 98 function send(message) 99 if (!window.WebSocket) return; 100 if (socket.readyState == WebSocket.OPEN) 101 socket.send(message); 102 else 103 console.log("WebSocket连接没有建立成功!"); 104 alert("您还未连接上服务器,请刷新页面重试"); 105 106 107 108 // 刷新上线人员 109 function initOnline() 110 $("#showOnlineNames").empty(); 111 for (var i=0;i<onlineName.size();i++) 112 $("#showOnlineNames").append(‘<tr><td>‘ + (i+1) + ‘</td>‘ + 113 ‘<td>‘ + onlineName.element(i).value + ‘</td>‘ + 114 ‘</tr>‘); 115 116 117 // 追加聊天信息 118 function appendBanana(name, message) 119 $("#showBanana").append(‘<tr><td>‘ + name + ‘: ‘ + message + ‘</td></tr>‘); 120 121 122 $("#onlineBtn").bind("click", function() 123 var name = $("#name").val(); 124 if (name == null || name == ‘‘) 125 alert("请输入您的尊姓大名"); 126 return; 127 128 129 nameOnline = name; 130 // 上线 131 send(JSON.stringify("requestId":uuid, "serviceId":1001, "name":name)); 132 ); 133 134 $("#downlineBtn").bind("click", function() 135 // 下线 136 send(JSON.stringify("requestId":uuid, "serviceId":1004)); 137 ); 138 139 $("#sendBtn").bind("click", function() 140 var message = $("#messageInput").val(); 141 if (message == null || message == ‘‘) 142 alert("请输入您的聊天信息"); 143 return; 144 145 146 // 发送聊天消息 147 send(JSON.stringify("requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message)); 148 $("#messageInput").val(""); 149 ); 150 151 ); 152 153 function guid() 154 function S4() 155 return (((1+Math.random())*0x10000)|0).toString(16).substring(1); 156 157 return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4()); 158 159 </script> 160 <body> 161 <h1>Netty WebSocket 聊天实例</h1> 162 <input type="text" id="name" value="佚名" placeholder="姓名" /> 163 <input type="button" id="onlineBtn" value="上线" /> 164 <input type="button" id="downlineBtn" value="下线" /> 165 <hr/> 166 <table id="banana" border="1" > 167 <tr> 168 <td width="600" align="center">聊天</td> 169 <td width="100" align="center">上线人员</td> 170 </tr> 171 <tr height="200" valign="top"> 172 <td> 173 <table id="showBanana" border="0" width="600"> 174 <!-- 175 <tr> 176 <td>张三: 大家好</td> 177 </tr> 178 <tr> 179 <td>李四: 欢迎加入群聊</td> 180 </tr> 181 --> 182 </table> 183 </td> 184 <td> 185 <table id="showOnlineNames" border="0"> 186 <!-- 187 <tr> 188 <td>1</td> 189 <td>张三</td> 190 <tr/> 191 <tr> 192 <td>2</td> 193 <td>李四</td> 194 <tr/> 195 --> 196 </table> 197 </td> 198 </tr> 199 <tr height="40"> 200 <td></td> 201 <td></td> 202 </tr> 203 <tr> 204 <td> 205 <input type="text" id="messageInput" style="width:590px" placeholder="巴拉巴拉点什么吧" /> 206 </td> 207 <td> 208 <input type="button" id="sendBtn" value="发送" /> 209 </td> 210 </tr> 211 </table> 212 213 </body> 214 </html>
分别运行WebSocketServer和ReservedWebSocketServer,运行日志如下:
============先adapter后handler============== ============连接开始======================== adapter handlerAdded = = adapter channel handlerAdded = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) channel Read0 = = handler with http request : adapter is write in DefaultChannelPromise@3b536aab(incomplete) with message : adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============上线用户======================== channel Read0 = = handler with socket request : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1001,"name":"佚名" adapter is write in DefaultChannelPromise@61ace442(incomplete) with socket message : "serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d":"佚名" adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============发送信息======================== channel Read0 = = handler with socket request : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1002,"name":"佚名","message":"queue" adapter is write in DefaultChannelPromise@5a4c689a(incomplete) with socket message : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1003,"name":"佚名","message":"queue" adapter is flush adapter is write in DefaultChannelPromise@3cfa9e57(incomplete) with socket message : "serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline": adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============下线用户======================== channel Read0 = = handler with socket request : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1004 adapter is write in DefaultChannelPromise@b6ce0dc(incomplete) with socket message : "serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline": adapter is flush adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245]) ============用户断线======================== channel Read0 = = handler with socket request : ? adapter is write in DefaultChannelPromise@567e3360(incomplete) close reason : adapter is flush adapter is close in DefaultChannelPromise@1b6673fb(success) adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:49245]) channel Inactive = = handler channel close = = handler adapter is close in DefaultChannelPromise@5b261d0a(success) channel Unregistered = = handler channel handlerRemoved = = handler adapter handlerRemoved = = adapter
以及
============先adapter后handler============== ============连接开始======================== channel handlerAdded = = handler adapter handlerAdded = = adapter adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) channel Read0 = = handler with http request : adapter is write in DefaultChannelPromise@171fb888(incomplete) with message : adapter is flush channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============上线用户======================== channel Read0 = = handler with socket request : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1001,"name":"佚名" adapter is write in DefaultChannelPromise@54042c55(incomplete) with socket message : "serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":"70d182cf-b0ae-27ba-296d-33bd3ab5177b":"佚名" channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============发送信息======================== channel Read0 = = handler with socket request : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1002,"name":"佚名","message":"queue" adapter is write in DefaultChannelPromise@324cb9a(incomplete) with socket message : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1003,"name":"佚名","message":"queue" adapter is write in DefaultChannelPromise@269f3a70(incomplete) with socket message : "serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline": channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============下线用户======================== channel Read0 = = handler with socket request : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1004 adapter is write in DefaultChannelPromise@2f6a67d7(incomplete) with socket message : "serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline": channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671]) ============用户断线======================== channel Read0 = = handler with socket request : ? adapter is write in DefaultChannelPromise@5dff633(incomplete) close reason : adapter is flush adapter is close in DefaultChannelPromise@1e58e8f0(success) channel Flush = = handler adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:64671]) channel Inactive = = handler channel close = = handler channel Unregistered = = handler adapter handlerRemoved = = adapter channel handlerRemoved = = handler
除了运行顺序不同,outBoundAdapter的flush操作也多了几次,尤其在发送这一块,因为不仅要接收数据包,还要发送数据包,要多刷新adapter。
由此可见,netty的pipeline一定要仔细规划,能先让服务器处理就先让服务器处理,把outbound拦截器放在inbound拦截器前面。
以上是关于netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响的主要内容,如果未能解决你的问题,请参考以下文章