javaNIO通信
Posted zhangwanhua
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了javaNIO通信相关的知识,希望对你有一定的参考价值。
传统BIO模式
服务端ServerSocket负责绑定IP地址,启动监听端口。客户端Socket负责发起连接操作,服务端接受到连接请求后为每个客户端创建一个新的线程进行链路处理,连路处理通过输入和输出流进行同步阻塞式通信。
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系。可以通过线程池来解决这个问题,使用线程池后,线程池会控制最大的线程数量以及线程的重复使用率。但本质上每个线程在处理访问时并没有大负荷允许,通信过程中每个通信都需要占一个线程,但其实线程使用率并不高。所以更理想的方式是,线程的连接与处理解藕,有一个统一的对象来管理所有连接,请求的处理交给独立的线程处理,这样连接的数量就会不再受限制线程的数量。
NIO模式
服务端ServerSocketChannel负责绑定IP地址,启动监听端口。然后将该ServerSocketChannel注册到一个Selector中。创建一个线程,轮询Selector来获取响应的事件。如果是ACCEPT事件,则有请求访问,获取响应的ServerSocketChannel,然后获取想要的SocketChannel,完成连接。然后再将SocketChannel注册到Selector中。如果是WRITE事件,则获取想要的SocketChannel,通过SocketChannel把一个缓存的数据写入;如果是READ事件,则获取想要的SocketChannel,通过SocketChannel读取数据到一个缓存中。实际上即使把ServerSocketChannel和SocketChannel绑定到Selector,通过Selector单线程去处理请求。
客户端SocketChannel首先注册到一个Selector中。然后连接服务器。轮询Selector来获取响应的事件,如果是CONNECT事件,则获取想要的SocketChannel,完成连接,然后再将SocketChannel注册到Selector;如果是READ事件,则获取想要的SocketChannel,通过SocketChannel读取数据到一个缓存中;如果是WRITE事件,则获取想要的SocketChannel,通过SocketChannel把一个缓存的数据写入;
断包和粘包问题
NIO读取和写入都是通过缓存实现的,由于存在缓存池的大小限制和网速的不均匀会造成一次读写的操作放入缓存池中的数据不完整,这就是断包问题。同理,如果一次性读入两个及两个以上的数据,则无法分辨两个数据包的界限问题,也就造成了粘包。总结就是NIO在读取和写入缓存时数据长度是不确定的,我们可以定制一个固定长度的缓存,通过判断缓存是否剩余来控制读取和写入数据的长度。那么缓存的长度定多少合适呢,一般会在读写数据时,定义一个头部数据来指定数据的长度。
NIO体系
NIO中有三个核心概念。第一个是通道,有点类似与IO中的流的概念;第二个是缓存,NIO中数据都是先存储在缓存中,然后统一输出的或者统一输入到缓存;第三个是选折器,选择器像是一个消息监听器,我们可以把相应的通道的相应事件注册到选择器,然后通过选择器获取相应事件的发生。
从最容易理解的缓存说起, IO中通信是直接端对端的,数据通信耦合非常大,时间比较浪费。在NIO中,通过缓存解藕了两端,数据首先进入缓存中,然后再将缓存中的数据全部写入输入通道;输出通道接收到数据后,首先将所有数据写入缓存中,然后交由相应程序处理。NIO提供了几种类型缓存:存储byte数据的缓存(ByteBuffer);存储char数据的缓存(CharBuffer);存储double数据的缓存(DoubleBuffer);存储float数据的缓存(FloatBuffer);存储int数据的缓存(IntBuffer);存储long数据的缓存(LongBuffer);存储short数据的缓存(ShortBuffer);文件数据缓存(MappedByteBuffer)不同的的缓存类型,只是存储的数据类型不同,本质都是二进制数据,只是定义的数据格式不同。所以可以通过ByteBuffer来存储各种类型数据,然后使用特定的编码方式读取。缓存内部本质是一个数组,有一个position标志标识当前的读写位置,有一个limit标识读写的范围。缓存都是线程不安全的。
方法 | 说明 |
clear | 将position标志设置为0;limit标识设置为容量大小,mark会被丢弃,常用在从管道中读写数据前。 |
flip | 将position标志设置为0;limit标识设置为当前位置,mark会被丢弃,常用在从管道中读写数据后。 |
rewind | 将position标志设置为0;mark会被丢弃,常用在从管道中读写数据后(如果limit已经设置正确)。 |
mark | 将position标志设置mark点。 |
reset | 将position标志设置成之前的mark点。 |
remaining | 返回position到limit的距离 |
hasRemaining | 是否还有剩余 |
方法 | 说明 |
allocate | 分配指定的空间 |
wrap | 将指定的数组包装成缓存对象,缓存的改变会影响原来的数组 |
slice | 创建新的缓存分片,内容从position到capacity,并且共享这段存储空间。 |
duplicate | 创建新的缓存,共享存储空间。 |
compact | 将position到limit的内容拷贝到缓存最前面,复制后,position = limit -position,limit = capacity |
isDirect | 是否是直接内存。 |
put | 写入数据 |
get | 读取数据 |
通道
按操作对象
网络通道(SocketChannel、ServerSocketChannel):
ServerSocketChannel内部本质封装了一个ServerSocket;
SocketChannel
方法 | 参数 | 返回值 | 说明 |
ZooKeeper代码实例:
服务端通过NIO接收请求,反序列化为相应对象,交给zookeeper服务器进行处理,处理结果写入缓存队列力,然后交给nio发送到客户端。
服务端NIO服务器,通过NIO建立通道(NioserverCnxn.Factory):
1 static public class Factory extends Thread { 2 static { 3 //设置全局的异常处理 4 Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 5 public void uncaughtException(Thread t, Throwable e) { 6 LOG.error("Thread " + t + " died", e); 7 } 8 }); 9 /** 10 * jvm早期的nullpoint bug 11 * http://bugs.sun.com/view_bug.do?bug_id=6427854 12 */ 13 try { 14 Selector.open().close(); 15 } catch(IOException ie) { 16 LOG.error("Selector failed to open", ie); 17 } 18 } 19 //服务器通道 20 final ServerSocketChannel ss; 21 //选择器 22 final Selector selector = Selector.open(); 23 ZooKeeperServer zks; 24 25 final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024); 26 //所有的上下文环境 27 final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>(); 28 //ip对应的上下文环境 29 final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = 30 new HashMap<InetAddress, Set<NIOServerCnxn>>( ); 31 int maxClientCnxns = 10; 32 33 public Factory(InetSocketAddress addr, int maxcc) throws IOException { 34 setDaemon(true); 35 //单个client链接最大数 36 maxClientCnxns = maxcc; 37 //创建服务器通道 38 this.ss = ServerSocketChannel.open(); 39 ss.socket().setReuseAddress(true); 40 //绑定端口 41 ss.socket().bind(addr); 42 //设置通道为非阻塞通道 43 ss.configureBlocking(false); 44 //把通道注册到选择器中 45 ss.register(selector, SelectionKey.OP_ACCEPT); 46 } 47 public void run() { 48 while (!ss.socket().isClosed()) { 49 try { 50 //选择一组键 51 selector.select(1000); 52 Set<SelectionKey> selected; 53 synchronized (this) { 54 selected = selector.selectedKeys(); 55 } 56 ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>( 57 selected); 58 Collections.shuffle(selectedList); 59 for (SelectionKey k : selectedList) { 60 //如果通道已经准备好接收套接字 61 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { 62 SocketChannel sc = ((ServerSocketChannel) k 63 .channel()).accept(); 64 InetAddress ia = sc.socket().getInetAddress(); 65 //判断最大连接数 66 int cnxncount = getClientCnxnCount(ia); 67 if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ 68 sc.close(); 69 } else { 70 // 配置为非阻塞 71 sc.configureBlocking(false); 72 //把通道注册到选择器中 73 SelectionKey sk = sc.register(selector, 74 SelectionKey.OP_READ); 75 NIOServerCnxn cnxn = createConnection(sc, sk); 76 //给该通道附带一个上下文环境 77 sk.attach(cnxn); 78 addCnxn(cnxn); 79 } 80 } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { 81 //通过上线文件来进行读写 82 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); 83 c.doIO(k); 84 } else { 85 if (LOG.isDebugEnabled()) { 86 LOG.debug("Unexpected ops in select " 87 + k.readyOps()); 88 } 89 } 90 } 91 selected.clear(); 92 catch (Exception e) { 93 LOG.warn("Ignoring exception", e); 94 } 95 } 96 clear(); 97 } 98 //关闭 99 public void shutdown() { 100 try { 101 ss.close(); 102 clear(); 103 this.interrupt(); 104 this.join(); 105 } catch (Exception e) { 106 LOG.warn("Ignoring unexpected exception during shutdown", e); 107 } 108 try { 109 selector.close(); 110 } catch (IOException e) { 111 LOG.warn("Selector closing", e); 112 } 113 if (zks != null) { 114 zks.shutdown(); 115 } 116 } 117 synchronized public void clear() { 118 selector.wakeup(); 119 HashSet<NIOServerCnxn> cnxns; 120 synchronized (this.cnxns) { 121 cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone(); 122 } 123 // got to clear all the connections that we have in the selector 124 for (NIOServerCnxn cnxn: cnxns) { 125 try { 126 // don‘t hold this.cnxns lock as deadlock may occur 127 cnxn.close(); 128 } catch (Exception e) { 129 LOG.warn("Ignoring exception closing cnxn sessionid 0x" 130 + Long.toHexString(cnxn.sessionId), e); 131 } 132 } 133 } 134 }
服务端通道上下文,通过NIO进行读写(NIOServerCnxn.doIO):
1 void doIO(SelectionKey k) throws InterruptedException { 2 try { 3 //如果通道可以读取数据 4 if (k.isReadable()) { 5 //读取数据到缓存中 6 int rc = sock.read(incomingBuffer); 7 //如果读满缓存 8 if (incomingBuffer.remaining() == 0) { 9 boolean isPayload; 10 //如果第一次读取,则先读取长度内容,相应分配缓存;否则读取指定长度的数据内容 11 if (incomingBuffer == lenBuffer) { 12 incomingBuffer.flip(); 13 isPayload = readLength(k); 14 incomingBuffer.clear(); 15 } else { 16 isPayload = true; 17 } 18 if (isPayload) { 19 //读取数据,初始化、读取请求数据封装成packet 20 readPayload(); 21 } 22 } 23 } 24 //如果通道可以读写数据 25 if (k.isWritable()) { 26 //缓存中有数据需要写入 27 if (outgoingBuffers.size() > 0) { 28 //创建bytebuffer 29 ByteBuffer directBuffer = factory.directBuffer; 30 directBuffer.clear(); 31 //从队列中读取数据知道缓存读满 32 for (ByteBuffer b : outgoingBuffers) { 33 if (directBuffer.remaining() < b.remaining()) { 34 b = (ByteBuffer) b.slice().limit( 35 directBuffer.remaining()); 36 } 37 int p = b.position(); 38 directBuffer.put(b); 39 b.position(p); 40 if (directBuffer.remaining() == 0) { 41 break; 42 } 43 } 44 //将数据写入通道 45 directBuffer.flip(); 46 int sent = sock.write(directBuffer); 47 ByteBuffer bb; 48 //从缓存中已经发送的删除数据 49 while (outgoingBuffers.size() > 0) { 50 bb = outgoingBuffers.peek(); 51 int left = bb.remaining() - sent; 52 if (left > 0) { 53 bb.position(bb.position() + sent); 54 break; 55 } 56 sent -= bb.remaining(); 57 outgoingBuffers.remove(); 58 } 59 } 60 61 synchronized(this.factory){ 62 if (outgoingBuffers.size() == 0) { 63 sk.interestOps(sk.interestOps() 64 & (~SelectionKey.OP_WRITE)); 65 } else { 66 sk.interestOps(sk.interestOps() 67 | SelectionKey.OP_WRITE); 68 } 69 } 70 } 71 }catch (IOException e) { 72 close(); 73 } 74 } 75 private void readPayload() throws IOException, InterruptedException { 76 if (incomingBuffer.remaining() != 0) { // have we read length bytes? 77 int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok 78 } 79 if (incomingBuffer.remaining() == 0) { 80 //重置缓存 81 incomingBuffer.flip(); 82 //如果没有进行初始化,首先要初始化;如果已经链接,则读取请求数据,封装成packet 83 if (!initialized) { 84 readConnectRequest(); 85 } else { 86 readRequest(); 87 } 88 //重置 89 lenBuffer.clear(); 90 incomingBuffer = lenBuffer; 91 } 92 }
服务端通道上下文,处理命令(NIOServerCnxn.readLength)
1 private boolean readLength(SelectionKey k) throws IOException { 2 //如果是请求数据,根据长度分配缓存;如果是命令,执行相应命令。 3 int len = lenBuffer.getInt(); 4 if (!initialized && checkFourLetterWord(k, len)) { 5 return false; 6 } 7 if (len < 0 || len > BinaryInputArchive.maxBuffer) { 8 throw new IOException("Len error " + len); 9 } 10 if (zk == null) { 11 throw new IOException("ZooKeeperServer not running"); 12 } 13 incomingBuffer = ByteBuffer.allocate(len); 14 return true; 15 } 16 private boolean checkFourLetterWord(final SelectionKey k, final int len) 17 throws IOException 18 { 19 //获取命令 20 String cmd = cmd2String.get(len); 21 /** cancel the selection key to remove the socket handling 22 * from selector. This is to prevent netcat problem wherein 23 * netcat immediately closes the sending side after sending the 24 * commands and still keeps the receiving channel open. 25 * The idea is to remove the selectionkey from the selector 26 * so that the selector does not notice the closed read on the 27 * socket channel and keep the socket alive to write the data to 28 * and makes sure to close the socket after its done writing the data 29 */ 30 if (k != null) { 31 try { 32 k.cancel(); 33 } catch(Exception e) { 34 LOG.error("Error cancelling command selection key ", e); 35 } 36 } 37 //根据命令类型,执行相应内容 38 final PrintWriter pwriter = new PrintWriter( 39 new BufferedWriter(new SendBufferWriter())); 40 if (len == ruokCmd) { 41 RuokCommand ruok = new RuokCommand(pwriter); 42 ruok.start(); 43 return true; 44 } else if (len == getTraceMaskCmd) { 45 TraceMaskCommand tmask = new TraceMaskCommand(pwriter); 46 tmask.start(); 47 return true; 48 } else if (len == setTraceMaskCmd) { 49 int rc = sock.read(incomingBuffer); 50 if (rc < 0) { 51 throw new IOException("Read error"); 52 } 53 54 incomingBuffer.flip(); 55 long traceMask = incomingBuffer.getLong(); 56 ZooTrace.setTextTraceLevel(traceMask); 57 SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask); 58 setMask.start(); 59 return true; 60 } else if (len == enviCmd) { 61 EnvCommand env = new EnvCommand(pwriter); 62 env.start(); 63 return true; 64 } else if (len == confCmd) { 65 ConfCommand ccmd = new ConfCommand(pwriter); 66 ccmd.start(); 67 return true; 68 } else if (len == srstCmd) { 69 StatResetCommand strst = new StatResetCommand(pwriter); 70 strst.start(); 71 return true; 72 } else if (len == crstCmd) { 73 CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter); 74 crst.start(); 75 return true; 76 } else if (len == dumpCmd) { 77 DumpCommand dump = new DumpCommand(pwriter); 78 dump.start(); 79 return true; 80 } else if (len == statCmd || len == srvrCmd) { 81 StatCommand stat = new StatCommand(pwriter, len); 82 stat.start(); 83 return true; 84 } else if (len == consCmd) { 85 ConsCommand cons = new ConsCommand(pwriter); 86 cons.start(); 87 return true; 88 } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) { 89 WatchCommand wcmd = new WatchCommand(pwriter, len); 90 wcmd.start(); 91 return true; 92 } 93 return false; 94 }
服务端通道上下文,处理请求数据(NIOServerCnxn.readRequestNIOServerCnxn.readConnectRequest):
1 private void readRequest() throws IOException { 2 //反序列化请求数据 3 InputStream bais = new ByteBufferInputStream(incomingBuffer); 4 BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); 5 RequestHeader h = new RequestHeader(); 6 h.deserialize(bia, "header"); 7 incomingBuffer = incomingBuffer.slice(); 8 if (h.getType() == OpCode.auth) { 9 //如果是认证请求 10 AuthPacket authPacket = new AuthPacket(); 11 ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket); 12 String scheme = authPacket.getScheme(); 13 //进行认证 14 AuthenticationProvider ap = ProviderRegistry.getProvider(scheme); 15 if (ap == null 16 || (ap.handleAuthentication(this, authPacket.getAuth()) 17 != KeeperException.Code.OK)) { 18 // 认证失败,返回失败内容 19 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, 20 KeeperException.Code.AUTHFAILED.intValue()); 21 sendResponse(rh, null, null); 22 //关闭链接 23 sendCloseSession(); 24 disableRecv(); 25 } else { 26 //认证成功,返回成功内容 27 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, 28 KeeperException.Code.OK.intValue()); 29 sendResponse(rh, null, null); 30 } 31 return; 32 } else { 33 //如果是请求,提交到zk处理 34 Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo); 35 si.setOwner(ServerCnxn.me); 36 zk.submitRequest(si); 37 } 38 } 39 40 private void readConnectRequest() throws IOException, InterruptedException { 41 //反序列化链接请求对象 42 BinaryInputArchive bia = BinaryInputArchive 43 .getArchive(new ByteBufferInputStream(incomingBuffer)); 44 ConnectRequest connReq = new ConnectRequest(); 45 connReq.deserialize(bia, "connect"); 46 if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) { 47 throw new CloseRequestException(msg); 48 } 49 sessionTimeout = connReq.getTimeOut(); 50 byte passwd[] = connReq.getPasswd(); 51 //初始化session 52 disableRecv(); 53 if (connReq.getSessionId() != 0) { 54 long clientSessionId = connReq.getSessionId(); 55 factory.closeSessionWithoutWakeup(clientSessionId); 56 setSessionId(clientSessionId); 57 zk.reopenSession(this, sessionId, passwd, sessionTimeout); 58 } else { 59 zk.createSession(this, passwd, sessionTimeout); 60 } 61 initialized = true; 62 }
服务端通道上下文,写返回数据(NIOServerCnxn.sendResponse)
1 synchronized public void sendResponse(ReplyHeader h, Record r, String tag) { 2 try { 3 //序列化返回结果, 4 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 5 BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); 6 try { 7 baos.write(fourBytes); 8 bos.writeRecord(h, "header"); 9 if (r != null) { 10 bos.writeRecord(r, tag); 11 } 12 baos.close(); 13 } catch (IOException e) { 14 LOG.error("Error serializing response"); 15 } 16 //写入数据长度 17 byte b[] = baos.toByteArray(); 18 ByteBuffer bb = ByteBuffer.wrap(b); 19 bb.putInt(b.length - 4).rewind(); 20 // 21 sendBuffer(bb); 22 } catch(Exception e) { 23 LOG.warn("Unexpected exception. Destruction averted.", e); 24 } 25 } 26 void sendBuffer(ByteBuffer bb) { 27 try { 28 if (bb != closeConn) { 29 //直接发送数据 30 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { 31 try { 32 sock.write(bb); 33 } catch (IOException e) { 34 // we are just doing best effort right now 35 } 36 } 37 if (bb.remaining() == 0) { 38 packetSent(); 39 return; 40 } 41 } 42 //写入缓存中。 43 synchronized(this.factory){ 44 sk.selector().wakeup(); 45 outgoingBuffers.add(bb); 46 if (sk.isValid()) { 47 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); 48 } 49 } 50 51 } catch(Exception e) { 52 LOG.error("Unexpected Exception: ", e); 53 } 54 }
客户端响应处理,处理返回事件(ClientCnxn.EventThread):
1 class EventThread extends Thread { 2 private final LinkedBlockingQueue<Object> waitingEvents = 3 new LinkedBlockingQueue<Object>(); 4 5 /** This is really the queued session state until the event 6 * thread actually processes the event and hands it to the watcher. 7 * But for all intents and purposes this is the state. 8 */ 9 private volatile KeeperState sessionState = KeeperState.Disconnected; 10 11 private volatile boolean wasKilled = false; 12 private volatile boolean isRunning = false; 13 14 //添加响应事件到队列中 15 public void queueEvent(WatchedEvent event) { 16 WatcherSetEventPair pair = new WatcherSetEventPair( 17 watcher.materialize(event.getState(), event.getType(), 18 event.getPath()), 19 event); 20 // queue the pair (watch set & event) for later processing 21 waitingEvents.add(pair); 22 } 23 //添加数据包到队列中 24 public void queuePacket(Packet packet) { 25 if (wasKilled) { 26 synchronized (waitingEvents) { 27 if (isRunning) waitingEvents.add(packet); 28 else processEvent(packet); 29 } 30 } else { 31 waitingEvents.add(packet); 32 } 33 } 34 //添加结束事件 35 public void queueEventOfDeath() { 36 waitingEvents.add(eventOfDeath); 37 } 38 39 @Override 40 public void run() { 41 try { 42 while (true) { 43 // 获取响应事件 44 Object event = waitingEvents.take(); 45 if (event == eventOfDeath) { 46 wasKilled = true; 47 } else { 48 //处理响应 49 processEvent(event); 50 } 51 if (wasKilled) 52 synchronized (waitingEvents) { 53 if (waitingEvents.isEmpty()) { 54 isRunning = false; 55 break; 56 } 57 } 58 } 59 } catch (InterruptedException e) { 60 LOG.error("Event thread exiting due to interruption", e); 61 } 62 LOG.info("EventThread shut down"); 63 } 64 65 private void processEvent(Object event) { 66 try { 67 if (event instanceof WatcherSetEventPair) { 68 // each watcher will process the event 69 WatcherSetEventPair pair = (WatcherSetEventPair) event; 70 for (Watcher watcher : pair.watchers) { 71 try { 72 watcher.process(pair.event); 73 } catch (Throwable t) { 74 LOG.error("Error while calling watcher ", t); 75 } 76 } 77 } else { 78 Packet p = (Packet) event; 79 int rc = 0; 80 String clientPath = p.clientPath; 81 if (p.replyHeader.getErr() != 0) { 82 rc = p.replyHeader.getErr(); 83 } 84 if (p.cb == null) { 85 LOG.warn("Somehow a null cb got to EventThread!"); 86 } else if (p.response instanceof ExistsResponse 87 || p.response instanceof SetDataResponse 88 || p.response instanceof SetACLResponse) { 89 StatCallback cb = (StatCallback) p.cb; 90 if (rc == 0) { 91 if (p.response instanceof ExistsResponse) { 92 cb.processResult(rc, clientPath, p.ctx, 93 ((ExistsResponse) p.response) 94 .getStat()); 95 } else if (p.response instanceof SetDataResponse) { 96 cb.processResult(rc, clientPath, p.ctx, 97 ((SetDataResponse) p.response) 98 .getStat()); 99 } else if (p.response instanceof SetACLResponse) { 100 cb.processResult(rc, clientPath, p.ctx, 101 ((SetACLResponse) p.response) 102 .getStat()); 103 } 104 } else { 105 cb.processResult(rc, clientPath, p.ctx, null); 106 } 107 } else if (p.response instanceof GetDataResponse) { 108 DataCallback cb = (DataCallback) p.cb; 109 GetDataResponse rsp = (GetDataResponse) p.response; 110 if (rc == 0) { 111 cb.processResult(rc, clientPath, p.ctx, rsp 112 .getData(), rsp.getStat()); 113 } else { 114 cb.processResult(rc, clientPath, p.ctx, null, 115 null); 116 } 117 } else if (p.response instanceof GetACLResponse) { 118 ACLCallback cb = (ACLCallback) p.cb; 119 GetACLResponse rsp = (GetACLResponse) p.response; 120 if (rc == 0) { 121 cb.processResult(rc, clientPath, p.ctx, rsp 122 .getAcl(), rsp.getStat()); 123 } else { 124 cb.processResult(rc, clientPath, p.ctx, null, 125 null); 126 } 127 } else if (p.response instanceof GetChildrenResponse) { 128 ChildrenCallback cb = (ChildrenCallback) p.cb; 129 GetChildrenResponse rsp = (GetChildrenResponse) p.response; 130 if (rc == 0) { 131 cb.processResult(rc, clientPath, p.ctx, rsp 132 .getChildren()); 133 } else { 134 cb.processResult(rc, clientPath, p.ctx, null); 135 } 136 } else if (p.response instanceof GetChildren2Response) { 137 Children2Callback cb = (Children2Callback) p.cb; 138 GetChildren2Response rsp = (GetChildren2Response) p.response; 139 if (rc == 0) { 140 cb.processResult(rc, clientPath, p.ctx, rsp 141 .getChildren(), rsp.getStat()); 142 } else { 143 cb.processResult(rc, clientPath, p.ctx, null, null); 144 } 145 } else if (p.response instanceof CreateResponse) { 146 StringCallback cb = (StringCallback) p.cb; 147 CreateResponse rsp = (CreateResponse) p.response; 148 if (rc == 0) { 149 cb.processResult(rc, clientPath, p.ctx, 150 (chrootPath == null 151 ? rsp.getPath() 152 : rsp.getPath() 153 .substring(chrootPath.length()))); 154 } else { 155 cb.processResult(rc, clientPath, p.ctx, null); 156 } 157 } else if (p.cb instanceof VoidCallback) { 158 VoidCallback cb = (VoidCallback) p.cb; 159 cb.processResult(rc, clientPath, p.ctx); 160 } 161 } 162 } catch (Throwable t) { 163 LOG.error("Caught unexpected throwable", t); 164 } 165 } 166 }
以上是关于javaNIO通信的主要内容,如果未能解决你的问题,请参考以下文章