netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响

Posted dgutfly

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响相关的知识,希望对你有一定的参考价值。

  进行这项实验之前,先读了xbmchina的简书文章,感谢这位大神提供的关于channelPipeline和channelHandler文章:

  【Netty】ChannelPipeline和ChannelHandler(一)

  【Netty】ChannelHandler的添加和删除(二)

  【Netty】inBound和outBound事件的传播过程

  之前想以leonzm的websocket_demo项目为基础,写netty4版本的聊天室,但是发现netty4的函数不一样,messageReceived(建立链接/接收数据包)和close(断开链接)不能覆写,研究了下handler的生命周期。知道channelRead0可以建立链接,并接收已建立链接的客户端的数据包;当隧道处于channelInactived阶段时,表明数据隧道(链接)要断开了,就要进入channelUnregistered阶段,这时就可以在上面执行链接相关数据清除工作;隧道的处理器ChannelHandler也有生命周期,handlerRemoved时也可以执行类似操作。

  netty的inbound和outbound的区别:除了inbound事件为被动触发,在某些情况发生时自动触发,outbound为主动触发,在需要主动执行某些操作时触发以外,outBound单独用不能接收到websocket客户端的信息(这是向外主动发信息的handler,接收信息要inbound来),outBound这个跟适合在pipeline流水线上嵌入,做AOP(切面编程)。

  开始执行channelPipeline流水线程序比较:

  Lanucher.java:(开启netty服务的主函数)

技术图片
 1 package com.company.lanucher;
 2 
 3 import com.company.server.ReversedWebSocketServer;
 4 import com.company.server.WebSocketServer;
 5 
 6 public class Lanucher 
 7 
 8     public static void main(String[] args) throws Exception 
 9         // 启动WebSocket,如果想开启另一个服务器,注释掉Reversed,再解除WebSocketServer的注释即可
10         //new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT);
11         new ReversedWebSocketServer().run(ReversedWebSocketServer.WEBSOCKET_PORT);
12     
13     
14 
Lanucher.java

  WebSocketServer.java:(流水线先执行inBoundHandler再执行OutBoundAdapter)

技术图片
 1 package com.company.server;
 2 
 3 import org.apache.log4j.Logger;
 4 
 5 import io.netty.bootstrap.ServerBootstrap;
 6 import io.netty.channel.Channel;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelPipeline;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.nio.NioserverSocketChannel;
12 import io.netty.handler.codec.http.HttpObjectAggregator;
13 import io.netty.handler.codec.http.HttpServerCodec;
14 import io.netty.handler.stream.ChunkedWriteHandler;
15 
16 /**
17  * WebSocket服务
18  *
19  */
20 public class WebSocketServer 
21     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
22     
23     // websocket端口
24     public static final int WEBSOCKET_PORT = 9090;
25 
26     public void run(int port) throws Exception 
27         EventLoopGroup bossGroup = new NioEventLoopGroup();
28         EventLoopGroup workerGroup = new NioEventLoopGroup();
29         try 
30             ServerBootstrap b = new ServerBootstrap();
31             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() 
32 
33                 @Override
34                 protected void initChannel(Channel channel) throws Exception 
35                     ChannelPipeline pipeline = channel.pipeline();
36                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码
37                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
38                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
39                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler的前置拦截器
40                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler
41                 
42             );
43             
44             Channel channel = b.bind(port).sync().channel();
45             LOG.info("WebSocket 已经启动,端口:" + port + ".");
46             channel.closeFuture().sync();
47          finally 
48             bossGroup.shutdownGracefully();
49             workerGroup.shutdownGracefully();
50         
51     
52 
WebSocketServer.java

  ReversedWebSocketServer.java:(流水线先执行OutBoundAdapter再执行inBoundHandler)

技术图片
 1 package com.company.server;
 2 
 3 import org.apache.log4j.Logger;
 4 
 5 import io.netty.bootstrap.ServerBootstrap;
 6 import io.netty.channel.Channel;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelPipeline;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12 import io.netty.handler.codec.http.HttpObjectAggregator;
13 import io.netty.handler.codec.http.HttpServerCodec;
14 import io.netty.handler.stream.ChunkedWriteHandler;
15 
16 public class ReversedWebSocketServer 
17     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
18     
19     // websocket端口
20     public static final int WEBSOCKET_PORT = 9090;
21     public static final int FUN_WEBSOCKET_PORT = 9091;
22 
23     public void run(int port) throws Exception 
24         EventLoopGroup bossGroup = new NioEventLoopGroup();
25         EventLoopGroup workerGroup = new NioEventLoopGroup();
26         try 
27             ServerBootstrap b = new ServerBootstrap();
28             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() 
29 
30                 @Override
31                 protected void initChannel(Channel channel) throws Exception 
32                     ChannelPipeline pipeline = channel.pipeline();
33                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息编码解码
34                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息组装
35                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
36                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服务端Handler
37                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服务端Handler
38                 
39             );
40             
41             Channel channel = b.bind(port).sync().channel();
42             LOG.info("WebSocket 已经启动,端口:" + port + ".");
43             channel.closeFuture().sync();
44          finally 
45             bossGroup.shutdownGracefully();
46             workerGroup.shutdownGracefully();
47         
48     
49     
50 
ReversedWebSocketServer.java

  BananaWebSocketServerHandler.java:(inBoundHandler,处理从客户端接收的请求)

技术图片
  1 package com.company.server;
  2 
  3 import io.netty.buffer.ByteBuf;
  4 import io.netty.buffer.Unpooled;
  5 import io.netty.channel.ChannelFuture;
  6 import io.netty.channel.ChannelFutureListener;
  7 import io.netty.channel.ChannelHandlerContext;
  8 import io.netty.channel.ChannelPromise;
  9 import io.netty.channel.SimpleChannelInboundHandler;
 10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 11 import io.netty.handler.codec.http.FullHttpRequest;
 12 import io.netty.handler.codec.http.FullHttpResponse;
 13 import io.netty.handler.codec.http.HttpHeaders;
 14 import io.netty.handler.codec.http.HttpResponseStatus;
 15 import io.netty.handler.codec.http.HttpVersion;
 16 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 17 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 18 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 19 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 20 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 21 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 22 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 23 import io.netty.util.CharsetUtil;
 24 
 25 import org.apache.log4j.Logger;
 26 
 27 import com.company.serviceimpl.BananaService;
 28 import com.company.util.BufToString;
 29 import com.company.util.CODE;
 30 import com.company.util.Request;
 31 import com.company.util.Response;
 32 import com.google.common.base.Strings;
 33 import com.google.gson.JsonSyntaxException;
 34 
 35 
 36 /**
 37  * WebSocket服务端Handler
 38  *
 39  */
 40 public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> 
 41     private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName());
 42     
 43     private WebSocketServerHandshaker handshaker;
 44     private ChannelHandlerContext ctx;
 45     private String sessionId;
 46     private boolean isLog = true;
 47     
 48     public BananaWebSocketServerHandler() 
 49         super();
 50     
 51     
 52     public BananaWebSocketServerHandler(boolean isLog) 
 53         this();
 54         this.isLog = isLog;
 55     
 56 
 57     //netty 5的覆写函数,netty4中用channelRead0代替
 58     public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception 
 59         if(this.isLog) 
 60             System.out.print("channel MessageReceived = = " + ctx.name());
 61         
 62         if (msg instanceof FullHttpRequest)  // 传统的HTTP接入
 63             FullHttpRequest mymsg = (FullHttpRequest) msg;
 64             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
 65             handleHttpRequest(ctx, mymsg);
 66          else if (msg instanceof WebSocketFrame)  // WebSocket接入
 67             WebSocketFrame mymsg = (WebSocketFrame) msg;
 68             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
 69             handleWebSocketFrame(ctx, mymsg);
 70         
 71     
 72     
 73     @Override
 74     public void handlerAdded(ChannelHandlerContext ctx) throws Exception
 75         System.out.println("channel handlerAdded = = " + ctx.name());
 76     
 77     
 78     @Override
 79     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
 80         System.out.println("channel handlerRemoved = = " + ctx.name());
 81     
 82     
 83     @Override
 84     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception 
 85         if(this.isLog) 
 86             System.out.print("channel Read0 = = " + ctx.name());
 87         
 88         if (msg instanceof FullHttpRequest)  // 传统的HTTP接入
 89             FullHttpRequest mymsg = (FullHttpRequest) msg;
 90             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
 91             handleHttpRequest(ctx, mymsg);
 92          else if (msg instanceof WebSocketFrame)  // WebSocket接入
 93             WebSocketFrame mymsg = (WebSocketFrame) msg;
 94             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
 95             handleWebSocketFrame(ctx, mymsg);
 96         
 97     
 98     
 99     @Override
100     public void channelInactive(ChannelHandlerContext ctx) 
101         if(this.isLog) 
102             System.out.println("channel Inactive = = " + ctx.name());
103         
104         try 
105             this.close(ctx, null);
106          catch (Exception e) 
107             e.printStackTrace();
108         
109     
110     
111     @Override
112     public void channelUnregistered(ChannelHandlerContext ctx) 
113         if(this.isLog) 
114             System.out.println("channel Unregistered = = " + ctx.name());
115         
116     
117 
118     @Override
119     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception 
120         ctx.flush();
121         System.out.println("channel Flush = = " + ctx.name());
122     
123     
124     @Override
125     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
126 
127         ctx.close();
128         if(this.isLog) 
129             System.err.println("channel exceptionCaught = = " + ctx.name());
130             cause.printStackTrace();
131         
132         BananaService.logout(sessionId); // 注销
133         BananaService.notifyDownline(sessionId); // 通知有人下线
134     
135 
136     //netty 5的覆写函数,netty4中用channelInactive代替
137     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception 
138         if(this.isLog) 
139             System.out.println("channel close = = " + ctx.name());
140         
141         BananaService.logout(sessionId); // 注销
142         BananaService.notifyDownline(sessionId); // 通知有人下线
143         ctx.close();
144     
145 
146     /**
147      * 处理Http请求,完成WebSocket握手<br/>
148      * 注意:WebSocket连接第一次请求使用的是Http
149      * @param ctx
150      * @param request
151      * @throws Exception
152      */
153     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception 
154         // 如果HTTP解码失败,返回HHTP异常
155         if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) 
156             sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
157             return;
158         
159 
160         // 正常WebSocket的Http连接请求,构造握手响应返回
161         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
162         handshaker = wsFactory.newHandshaker(request);
163         if (handshaker == null)  // 无法处理的websocket版本
164             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
165          else  // 向客户端发送websocket握手,完成握手
166             handshaker.handshake(ctx.channel(), request);
167             // 记录管道处理上下文,便于服务器推送数据到客户端
168             this.ctx = ctx;
169         
170     
171 
172     /**
173      * 处理Socket请求
174      * @param ctx
175      * @param frame
176      * @throws Exception 
177      */
178     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception 
179         // 判断是否是关闭链路的指令
180         if (frame instanceof CloseWebSocketFrame) 
181             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
182             return;
183         
184         // 判断是否是Ping消息
185         if (frame instanceof PingWebSocketFrame) 
186             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
187             return;
188         
189         // 当前只支持文本消息,不支持二进制消息
190         if (!(frame instanceof TextWebSocketFrame)) 
191             throw new UnsupportedOperationException("当前只支持文本消息,不支持二进制消息");
192         
193         
194         // 处理来自客户端的WebSocket请求
195         try 
196             /*
197             if(this.isLog) 
198                 System.out.println("handleWebSocketFrame-=-=-" + ((TextWebSocketFrame)frame).text());
199             
200             */
201             Request request = Request.create(((TextWebSocketFrame)frame).text());
202             Response response = new Response();
203             response.setServiceId(request.getServiceId());
204             if (CODE.online.code.intValue() == request.getServiceId())  // 客户端注册
205                 String requestId = request.getRequestId();
206                 if (Strings.isNullOrEmpty(requestId)) 
207                     response.setIsSucc(false).setMessage("requestId不能为空");
208                     return;
209                  else if (Strings.isNullOrEmpty(request.getName())) 
210                     response.setIsSucc(false).setMessage("name不能为空");
211                     return;
212                  else if (BananaService.bananaWatchMap.containsKey(requestId)) 
213                     response.setIsSucc(false).setMessage("您已经注册了,不能重复注册");
214                     return;
215                 
216                 if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) 
217                     response.setIsSucc(false).setMessage("注册失败");
218                  else 
219                     response.setIsSucc(true).setMessage("注册成功");
220                     
221                     BananaService.bananaWatchMap.forEach((reqId, callBack) -> 
222                         response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 将已经上线的人员返回
223                         
224                         if (!reqId.equals(requestId)) 
225                             Request serviceRequest = new Request();
226                             serviceRequest.setServiceId(CODE.online.code);
227                             serviceRequest.setRequestId(requestId);
228                             serviceRequest.setName(request.getName());
229                             try 
230                                 callBack.send(serviceRequest); // 通知有人上线
231                              catch (Exception e) 
232                                 LOG.warn("回调发送消息给客户端异常", e);
233                             
234                         
235                     );
236                 
237                 sendWebSocket(response.toJson());
238                 this.sessionId = requestId; // 记录会话id,当页面刷新或浏览器关闭时,注销掉此链路
239              else if (CODE.send_message.code.intValue() == request.getServiceId())  // 客户端发送消息到聊天群
240                 String requestId = request.getRequestId();
241                 if (Strings.isNullOrEmpty(requestId)) 
242                     response.setIsSucc(false).setMessage("requestId不能为空");
243                  else if (Strings.isNullOrEmpty(request.getName())) 
244                     response.setIsSucc(false).setMessage("name不能为空");
245                  else if (Strings.isNullOrEmpty(request.getMessage())) 
246                     response.setIsSucc(false).setMessage("message不能为空");
247                  else 
248                     response.setIsSucc(true).setMessage("发送消息成功");
249                     
250                     BananaService.bananaWatchMap.forEach((reqId, callBack) ->  // 将消息发送到所有机器
251                         Request serviceRequest = new Request();
252                         serviceRequest.setServiceId(CODE.receive_message.code);
253                         serviceRequest.setRequestId(requestId);
254                         serviceRequest.setName(request.getName());
255                         serviceRequest.setMessage(request.getMessage());
256                         try 
257                             callBack.send(serviceRequest);
258                          catch (Exception e) 
259                             LOG.warn("回调发送消息给客户端异常", e);
260                         
261                     );
262                 
263                 sendWebSocket(response.toJson());
264              else if (CODE.downline.code.intValue() == request.getServiceId())  // 客户端下线
265                 String requestId = request.getRequestId();
266                 if (Strings.isNullOrEmpty(requestId)) 
267                     sendWebSocket(response.setIsSucc(false).setMessage("requestId不能为空").toJson());
268                  else 
269                     BananaService.logout(requestId);
270                     response.setIsSucc(true).setMessage("下线成功");
271                     
272                     BananaService.notifyDownline(requestId); // 通知有人下线
273                     
274                     sendWebSocket(response.toJson());
275                 
276                 
277              else 
278                 sendWebSocket(response.setIsSucc(false).setMessage("未知请求").toJson());
279             
280          catch (JsonSyntaxException e1) 
281             LOG.warn("Json解析异常", e1);
282             System.err.println("Json解析异常");
283             e1.printStackTrace();
284          catch (Exception e2) 
285             LOG.error("处理Socket请求异常", e2);
286             System.err.println("处理Socket请求异常");
287             e2.printStackTrace();
288         
289     
290 
291     /**
292      * Http返回
293      * @param ctx
294      * @param request
295      * @param response
296      */
297     private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) 
298         // 返回应答给客户端
299         if (response.getStatus().code() != 200) 
300             ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
301             response.content().writeBytes(buf);
302             buf.release();
303             HttpHeaders.setContentLength(response, response.content().readableBytes());
304         
305 
306         // 如果是非Keep-Alive,关闭连接
307         ChannelFuture f = ctx.channel().writeAndFlush(response);
308         if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) 
309             f.addListener(ChannelFutureListener.CLOSE);
310         
311     
312     
313     /**
314      * WebSocket返回
315      * @param ctx
316      * @param req
317      * @param res
318      */
319     public void sendWebSocket(String msg) throws Exception 
320         if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) 
321             throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
322         
323         this.ctx.channel().write(new TextWebSocketFrame(msg));
324         this.ctx.flush();
325     
326 
327 
BananaWebSocketServerHandler.java

  FunWebSocketServerHandler.java:(outBoundAdapter,处理从服务器发出的响应)

技术图片
 1 package com.company.server;
 2 
 3 import java.net.SocketAddress;
 4 
 5 import com.company.util.BufToString;
 6 
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelOutboundHandlerAdapter;
 9 import io.netty.channel.ChannelPromise;
10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
11 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
13 
14 public class FunWebSocketServerHandler extends ChannelOutboundHandlerAdapter
15     
16     @Override
17     public void read(ChannelHandlerContext ctx) throws Exception   
18         ChannelHandlerContext readRes = ctx.read();  
19         System.out.println(ctx.name() + " is read in " + readRes.toString());
20     
21     
22     @Override
23     public void handlerAdded(ChannelHandlerContext ctx) throws Exception
24         System.out.println(ctx.name() + " handlerAdded = = " + ctx.name());
25     
26     
27     @Override
28     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
29         System.out.println(ctx.name() + " handlerRemoved = = " + ctx.name());
30     
31     
32     @Override
33     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,  
34             ChannelPromise promise) throws Exception   
35         ctx.bind(localAddress, promise);  
36         System.out.println(ctx.name() + " is bind in " + localAddress.toString() + " in " + promise.toString());
37     
38     @Override
39     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,  
40             SocketAddress localAddress, ChannelPromise promise) throws Exception   
41         ctx.connect(remoteAddress, localAddress, promise);  
42         System.out.println(ctx.name() + " is connect in " + localAddress.toString() + " in client "  + remoteAddress.toString() + " in " + promise.toString());
43        
44     @Override
45     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)  
46             throws Exception   
47         ctx.disconnect(promise);  
48         System.out.println(ctx.name() + " is disconnect in " + promise.toString());
49        
50     @Override
51     public void close(ChannelHandlerContext ctx, ChannelPromise promise)  
52             throws Exception   
53         ctx.close(promise);  
54         System.out.println(ctx.name() + " is close in " + promise.toString());
55        
56     @Override
57     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception   
58         ctx.deregister(promise); 
59         System.out.println(ctx.name() + " is deregister in " + promise.toString());
60        
61 
62     @Override
63     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception   
64         ctx.write(msg, promise);  
65         System.out.print(ctx.name() + " is write in " + promise.toString());
66         if(msg instanceof DefaultFullHttpResponse) 
67             System.out.println(" with message : " + BufToString.convertByteBufToString(((DefaultFullHttpResponse)msg).content()));
68         
69         else if(msg instanceof TextWebSocketFrame) 
70             System.out.println(" with socket message : " + ((TextWebSocketFrame)msg).text());
71         
72         else if(msg instanceof CloseWebSocketFrame) 
73             System.out.println(" close reason : " + ((CloseWebSocketFrame)msg).reasonText());
74         
75         else 
76             System.out.println(" with message : " + msg.getClass());
77         
78        
79     @Override
80     public void flush(ChannelHandlerContext ctx) throws Exception   
81         ctx.flush();  
82         System.out.println(ctx.name() + " is flush");
83       
84 
FunWebSocketServerHandler.java

  banana.html:(聊天室前端)

技术图片
  1 <!DOCTYPE html>
  2 <html>
  3 <head>
  4 <meta charset="UTF-8">
  5 <title>Netty WebSocket 聊天实例</title>
  6 </head>
  7 <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" type="text/javascript"></script>
  8 <script src="map.js" type="text/javascript"></script>
  9 <script type="text/javascript">
 10 $(document).ready(function() 
 11     var uuid = guid(); // uuid在一个会话唯一
 12     var nameOnline = ‘‘; // 上线姓名
 13     var onlineName = new Map(); // 已上线人员, <requestId, name>
 14     
 15     $("#name").attr("disabled","disabled");
 16     $("#onlineBtn").attr("disabled","disabled");
 17     $("#downlineBtn").attr("disabled","disabled");
 18     
 19     $("#banana").hide();
 20 
 21     // 初始化websocket
 22     var socket;
 23     if (!window.WebSocket) 
 24         window.WebSocket = window.MozWebSocket;
 25     
 26     if (window.WebSocket) 
 27         socket = new WebSocket("ws://localhost:9090/");
 28         socket.onmessage = function(event) 
 29             console.log("收到服务器消息:" + event.data);
 30             if (event.data.indexOf("isSucc") != -1) // 这里需要判断是客户端请求服务端返回后的消息(response)
 31                 var response = JSON.parse(event.data);
 32                 if (response != undefined && response != null) 
 33                     if (response.serviceId == 1001)  // 上线
 34                         if (response.isSucc) 
 35                             // 上线成功,初始化已上线人员
 36                             onlineName.clear();
 37                             $("#showOnlineNames").empty();
 38                             for (var reqId in response.hadOnline) 
 39                                 onlineName.put(reqId, response.hadOnline[reqId]);
 40                             
 41                             initOnline();
 42                             
 43                             $("#name").attr("disabled","disabled");
 44                             $("#onlineBtn").attr("disabled","disabled");
 45                             $("#downlineBtn").removeAttr("disabled");
 46                             $("#banana").show();
 47                          else 
 48                             alert("上线失败");
 49                         
 50                      else if (response.serviceId == 1004) 
 51                         if (response.isSucc) 
 52                             onlineName.clear();
 53                             $("#showBanana").empty();
 54                             $("#showOnlineNames").empty();
 55                             $("#name").removeAttr("disabled");
 56                             $("#onlineBtn").removeAttr("disabled");
 57                             $("#downlineBtn").attr("disabled","disabled");
 58                             $("#banana").hide();
 59                          else 
 60                             alert("下线失败");
 61                         
 62                     
 63                 
 64              else // 还是服务端向客户端的请求(request)
 65                 var request = JSON.parse(event.data);
 66                 if (request != undefined && request != null) 
 67                     if (request.serviceId == 1001 || request.serviceId == 1004)  // 有人上线/下线
 68                         if (request.serviceId == 1001) 
 69                             onlineName.put(request.requestId, request.name);
 70                         
 71                         if (request.serviceId == 1004) 
 72                             onlineName.removeByKey(request.requestId);
 73                         
 74                         
 75                         initOnline();
 76                      else if (request.serviceId == 1003)  // 有人发消息
 77                         appendBanana(request.name, request.message);
 78                     
 79                 
 80             
 81         ;
 82         socket.onopen = function(event) 
 83             $("#name").removeAttr("disabled");
 84             $("#onlineBtn").removeAttr("disabled");
 85             console.log("已连接服务器");
 86         ;
 87         socket.onclose = function(event)  // WebSocket 关闭
 88             console.log("WebSocket已经关闭!");
 89         ;
 90         socket.onerror = function(event) 
 91             console.log("WebSocket异常!");
 92         ;
 93      else 
 94         alert("抱歉,您的浏览器不支持WebSocket协议!");
 95     
 96     
 97     // WebSocket发送请求
 98     function send(message) 
 99         if (!window.WebSocket)  return; 
100         if (socket.readyState == WebSocket.OPEN) 
101             socket.send(message);
102          else 
103             console.log("WebSocket连接没有建立成功!");
104             alert("您还未连接上服务器,请刷新页面重试");
105         
106     
107     
108     // 刷新上线人员
109     function initOnline() 
110         $("#showOnlineNames").empty();
111         for (var i=0;i<onlineName.size();i++) 
112             $("#showOnlineNames").append(<tr><td> + (i+1) + </td> +
113             <td> + onlineName.element(i).value + </td> +
114             </tr>);
115         
116     
117     // 追加聊天信息
118     function appendBanana(name, message) 
119         $("#showBanana").append(<tr><td> + name + :  + message + </td></tr>);
120     
121     
122     $("#onlineBtn").bind("click", function() 
123         var name = $("#name").val();
124         if (name == null || name == ‘‘) 
125             alert("请输入您的尊姓大名");
126             return;
127         
128 
129         nameOnline = name;
130         // 上线
131         send(JSON.stringify("requestId":uuid, "serviceId":1001, "name":name));
132     );
133     
134     $("#downlineBtn").bind("click", function() 
135         // 下线
136         send(JSON.stringify("requestId":uuid, "serviceId":1004));
137     );
138     
139     $("#sendBtn").bind("click", function() 
140         var message = $("#messageInput").val();
141         if (message == null || message == ‘‘) 
142             alert("请输入您的聊天信息");
143             return;
144         
145         
146         // 发送聊天消息
147         send(JSON.stringify("requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message));
148         $("#messageInput").val("");
149     );
150     
151 );
152 
153 function guid() 
154     function S4() 
155        return (((1+Math.random())*0x10000)|0).toString(16).substring(1);
156     
157     return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4());
158 
159 </script>
160 <body>
161   <h1>Netty WebSocket 聊天实例</h1>
162   <input type="text" id="name" value="佚名" placeholder="姓名" />
163   <input type="button" id="onlineBtn" value="上线" />
164   <input type="button" id="downlineBtn" value="下线" />
165   <hr/>
166   <table id="banana" border="1" >
167     <tr>
168       <td width="600" align="center">聊天</td>
169       <td width="100" align="center">上线人员</td>
170     </tr>
171     <tr height="200" valign="top">
172       <td>
173         <table id="showBanana" border="0" width="600">
174             <!--
175             <tr>
176               <td>张三: 大家好</td>
177             </tr>
178             <tr>
179               <td>李四: 欢迎加入群聊</td>
180             </tr>
181             -->
182         </table>
183       </td>
184       <td>
185         <table id="showOnlineNames" border="0">
186             <!--
187             <tr>
188               <td>1</td>
189               <td>张三</td>
190             <tr/>
191             <tr>
192               <td>2</td>
193               <td>李四</td>
194             <tr/>
195             -->
196         </table>
197       </td>
198     </tr>
199     <tr height="40">
200       <td></td>
201       <td></td>
202     </tr>
203     <tr>
204       <td>
205         <input type="text" id="messageInput"  style="width:590px" placeholder="巴拉巴拉点什么吧" />
206       </td>
207       <td>
208         <input type="button" id="sendBtn" value="发送" />
209       </td>
210     </tr>
211   </table>
212 
213 </body>
214 </html>
banana.html

  分别运行WebSocketServer和ReservedWebSocketServer,运行日志如下:

技术图片
============先adapter后handler==============
============连接开始========================

adapter handlerAdded = = adapter
channel handlerAdded = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
channel Read0 = = handler with http request : 
adapter is write in DefaultChannelPromise@3b536aab(incomplete) with message : 
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============上线用户========================

channel Read0 = = handler with socket request : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1001,"name":"佚名"
adapter is write in DefaultChannelPromise@61ace442(incomplete) with socket message : "serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d":"佚名"
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============发送信息========================

channel Read0 = = handler with socket request : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1002,"name":"佚名","message":"queue"
adapter is write in DefaultChannelPromise@5a4c689a(incomplete) with socket message : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1003,"name":"佚名","message":"queue"
adapter is flush
adapter is write in DefaultChannelPromise@3cfa9e57(incomplete) with socket message : "serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============下线用户========================

channel Read0 = = handler with socket request : "requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1004
adapter is write in DefaultChannelPromise@b6ce0dc(incomplete) with socket message : "serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============用户断线========================

channel Read0 = = handler with socket request : ?
adapter is write in DefaultChannelPromise@567e3360(incomplete) close reason : 
adapter is flush
adapter is close in DefaultChannelPromise@1b6673fb(success)
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:49245])
channel Inactive = = handler
channel close = = handler
adapter is close in DefaultChannelPromise@5b261d0a(success)
channel Unregistered = = handler
channel handlerRemoved = = handler
adapter handlerRemoved = = adapter
WebSocketServer运行结果

  以及

技术图片
============先adapter后handler==============
============连接开始========================

channel handlerAdded = = handler
adapter handlerAdded = = adapter
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
channel Read0 = = handler with http request : 
adapter is write in DefaultChannelPromise@171fb888(incomplete) with message : 
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============上线用户========================

channel Read0 = = handler with socket request : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1001,"name":"佚名"
adapter is write in DefaultChannelPromise@54042c55(incomplete) with socket message : "serviceId":1001,"isSucc":true,"message":"注册成功","hadOnline":"70d182cf-b0ae-27ba-296d-33bd3ab5177b":"佚名"
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============发送信息========================

channel Read0 = = handler with socket request : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1002,"name":"佚名","message":"queue"
adapter is write in DefaultChannelPromise@324cb9a(incomplete) with socket message : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1003,"name":"佚名","message":"queue"
adapter is write in DefaultChannelPromise@269f3a70(incomplete) with socket message : "serviceId":1002,"isSucc":true,"message":"发送消息成功","hadOnline":
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============下线用户========================

channel Read0 = = handler with socket request : "requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1004
adapter is write in DefaultChannelPromise@2f6a67d7(incomplete) with socket message : "serviceId":1004,"isSucc":true,"message":"下线成功","hadOnline":
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============用户断线========================

channel Read0 = = handler with socket request : ?
adapter is write in DefaultChannelPromise@5dff633(incomplete) close reason : 
adapter is flush
adapter is close in DefaultChannelPromise@1e58e8f0(success)
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:64671])
channel Inactive = = handler
channel close = = handler
channel Unregistered = = handler
adapter handlerRemoved = = adapter
channel handlerRemoved = = handler
ReversedWebSocketServer运行结果

  除了运行顺序不同,outBoundAdapter的flush操作也多了几次,尤其在发送这一块,因为不仅要接收数据包,还要发送数据包,要多刷新adapter。

  由此可见,netty的pipeline一定要仔细规划,能先让服务器处理就先让服务器处理,把outbound拦截器放在inbound拦截器前面。

以上是关于netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响的主要内容,如果未能解决你的问题,请参考以下文章

netty之channelPipeline

Netty 之 ChannelPipeline

Netty源码之——ChannelPipeline

Netty中的ChannelPipeline

5. Netty源码分析之ChannelPipeline 和 ChannelHanler

Netty源码_ChannelPipeline和ChannelHandlerContext详解