从最容易理解的缓存说起, IO中通信是直接端对端的,数据通信耦合非常大,时间比较浪费。在NIO中,通过缓存解藕了两端,数据首先进入缓存中,然后再将缓存中的数据全部写入输入通道;输出通道接收到数据后,首先将所有数据写入缓存中,然后交由相应程序处理。NIO提供了几种类型缓存:存储byte数据的缓存(ByteBuffer);存储char数据的缓存(CharBuffer);存储double数据的缓存(DoubleBuffer);存储float数据的缓存(FloatBuffer);存储int数据的缓存(IntBuffer);存储long数据的缓存(LongBuffer);存储short数据的缓存(ShortBuffer);文件数据缓存(MappedByteBuffer)不同的的缓存类型,只是存储的数据类型不同,本质都是二进制数据,只是定义的数据格式不同。所以可以通过ByteBuffer来存储各种类型数据,然后使用特定的编码方式读取。缓存内部本质是一个数组,有一个position标志标识当前的读写位置,有一个limit标识读写的范围。缓存都是线程不安全的。
方法 | 说明 |
clear | 将position标志设置为0;limit标识设置为容量大小,mark会被丢弃,常用在从管道中读写数据前。 |
flip | 将position标志设置为0;limit标识设置为当前位置,mark会被丢弃,常用在从管道中读写数据后。 |
rewind | 将position标志设置为0;mark会被丢弃,常用在从管道中读写数据后(如果limit已经设置正确)。 |
mark | 将position标志设置mark点。 |
reset | 将position标志设置成之前的mark点。 |
remaining | 返回position到limit的距离 |
hasRemaining | 是否还有剩余 |
方法 | 说明 |
allocate | 分配指定的空间 |
wrap | 将指定的数组包装成缓存对象,缓存的改变会影响原来的数组 |
slice | 创建新的缓存分片,内容从position到capacity,并且共享这段存储空间。 |
duplicate | 创建新的缓存,共享存储空间。 |
compact | 将position到limit的内容拷贝到缓存最前面,复制后,position = limit -position,limit = capacity |
isDirect | 是否是直接内存。 |
put | 写入数据 |
get | 读取数据 |
方法 | 参数 | 返回值 | 说明 |

1 static public class Factory extends Thread { 2 static { 3 //设置全局的异常处理 4 Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 5 public void uncaughtException(Thread t, Throwable e) { 6 LOG.error("Thread " + t + " died", e); 7 } 8 }); 9 /** 10 * jvm早期的nullpoint bug 11 * 12 */ 13 try { 14; 15 } catch(IOException ie) { 16 LOG.error("Selector failed to open", ie); 17 } 18 } 19 //服务器通道 20 final ServerSocketChannel ss; 21 //选择器 22 final Selector selector =; 23 ZooKeeperServer zks; 24 25 final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024); 26 //所有的上下文环境 27 final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>(); 28 //ip对应的上下文环境 29 final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap = 30 new HashMap<InetAddress, Set<NIOServerCnxn>>( ); 31 int maxClientCnxns = 10; 32 33 public Factory(InetSocketAddress addr, int maxcc) throws IOException { 34 setDaemon(true); 35 //单个client链接最大数 36 maxClientCnxns = maxcc; 37 //创建服务器通道 38 =; 39 ss.socket().setReuseAddress(true); 40 //绑定端口 41 ss.socket().bind(addr); 42 //设置通道为非阻塞通道 43 ss.configureBlocking(false); 44 //把通道注册到选择器中 45 ss.register(selector, SelectionKey.OP_ACCEPT); 46 } 47 public void run() { 48 while (!ss.socket().isClosed()) { 49 try { 50 //选择一组键 51; 52 Set<SelectionKey> selected; 53 synchronized (this) { 54 selected = selector.selectedKeys(); 55 } 56 ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>( 57 selected); 58 Collections.shuffle(selectedList); 59 for (SelectionKey k : selectedList) { 60 //如果通道已经准备好接收套接字 61 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { 62 SocketChannel sc = ((ServerSocketChannel) k 63 .channel()).accept(); 64 InetAddress ia = sc.socket().getInetAddress(); 65 //判断最大连接数 66 int cnxncount = getClientCnxnCount(ia); 67 if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ 68 sc.close(); 69 } else { 70 // 配置为非阻塞 71 sc.configureBlocking(false); 72 //把通道注册到选择器中 73 SelectionKey sk = sc.register(selector, 74 SelectionKey.OP_READ); 75 NIOServerCnxn cnxn = createConnection(sc, sk); 76 //给该通道附带一个上下文环境 77 sk.attach(cnxn); 78 addCnxn(cnxn); 79 } 80 } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { 81 //通过上线文件来进行读写 82 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); 83 c.doIO(k); 84 } else { 85 if (LOG.isDebugEnabled()) { 86 LOG.debug("Unexpected ops in select " 87 + k.readyOps()); 88 } 89 } 90 } 91 selected.clear(); 92 catch (Exception e) { 93 LOG.warn("Ignoring exception", e); 94 } 95 } 96 clear(); 97 } 98 //关闭 99 public void shutdown() { 100 try { 101 ss.close(); 102 clear(); 103 this.interrupt(); 104 this.join(); 105 } catch (Exception e) { 106 LOG.warn("Ignoring unexpected exception during shutdown", e); 107 } 108 try { 109 selector.close(); 110 } catch (IOException e) { 111 LOG.warn("Selector closing", e); 112 } 113 if (zks != null) { 114 zks.shutdown(); 115 } 116 } 117 synchronized public void clear() { 118 selector.wakeup(); 119 HashSet<NIOServerCnxn> cnxns; 120 synchronized (this.cnxns) { 121 cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone(); 122 } 123 // got to clear all the connections that we have in the selector 124 for (NIOServerCnxn cnxn: cnxns) { 125 try { 126 // don‘t hold this.cnxns lock as deadlock may occur 127 cnxn.close(); 128 } catch (Exception e) { 129 LOG.warn("Ignoring exception closing cnxn sessionid 0x" 130 + Long.toHexString(cnxn.sessionId), e); 131 } 132 } 133 } 134 }

1 void doIO(SelectionKey k) throws InterruptedException { 2 try { 3 //如果通道可以读取数据 4 if (k.isReadable()) { 5 //读取数据到缓存中 6 int rc =; 7 //如果读满缓存 8 if (incomingBuffer.remaining() == 0) { 9 boolean isPayload; 10 //如果第一次读取,则先读取长度内容,相应分配缓存;否则读取指定长度的数据内容 11 if (incomingBuffer == lenBuffer) { 12 incomingBuffer.flip(); 13 isPayload = readLength(k); 14 incomingBuffer.clear(); 15 } else { 16 isPayload = true; 17 } 18 if (isPayload) { 19 //读取数据,初始化、读取请求数据封装成packet 20 readPayload(); 21 } 22 } 23 } 24 //如果通道可以读写数据 25 if (k.isWritable()) { 26 //缓存中有数据需要写入 27 if (outgoingBuffers.size() > 0) { 28 //创建bytebuffer 29 ByteBuffer directBuffer = factory.directBuffer; 30 directBuffer.clear(); 31 //从队列中读取数据知道缓存读满 32 for (ByteBuffer b : outgoingBuffers) { 33 if (directBuffer.remaining() < b.remaining()) { 34 b = (ByteBuffer) b.slice().limit( 35 directBuffer.remaining()); 36 } 37 int p = b.position(); 38 directBuffer.put(b); 39 b.position(p); 40 if (directBuffer.remaining() == 0) { 41 break; 42 } 43 } 44 //将数据写入通道 45 directBuffer.flip(); 46 int sent = sock.write(directBuffer); 47 ByteBuffer bb; 48 //从缓存中已经发送的删除数据 49 while (outgoingBuffers.size() > 0) { 50 bb = outgoingBuffers.peek(); 51 int left = bb.remaining() - sent; 52 if (left > 0) { 53 bb.position(bb.position() + sent); 54 break; 55 } 56 sent -= bb.remaining(); 57 outgoingBuffers.remove(); 58 } 59 } 60 61 synchronized(this.factory){ 62 if (outgoingBuffers.size() == 0) { 63 sk.interestOps(sk.interestOps() 64 & (~SelectionKey.OP_WRITE)); 65 } else { 66 sk.interestOps(sk.interestOps() 67 | SelectionKey.OP_WRITE); 68 } 69 } 70 } 71 }catch (IOException e) { 72 close(); 73 } 74 } 75 private void readPayload() throws IOException, InterruptedException { 76 if (incomingBuffer.remaining() != 0) { // have we read length bytes? 77 int rc =; // sock is non-blocking, so ok 78 } 79 if (incomingBuffer.remaining() == 0) { 80 //重置缓存 81 incomingBuffer.flip(); 82 //如果没有进行初始化,首先要初始化;如果已经链接,则读取请求数据,封装成packet 83 if (!initialized) { 84 readConnectRequest(); 85 } else { 86 readRequest(); 87 } 88 //重置 89 lenBuffer.clear(); 90 incomingBuffer = lenBuffer; 91 } 92 }

1 private boolean readLength(SelectionKey k) throws IOException { 2 //如果是请求数据,根据长度分配缓存;如果是命令,执行相应命令。 3 int len = lenBuffer.getInt(); 4 if (!initialized && checkFourLetterWord(k, len)) { 5 return false; 6 } 7 if (len < 0 || len > BinaryInputArchive.maxBuffer) { 8 throw new IOException("Len error " + len); 9 } 10 if (zk == null) { 11 throw new IOException("ZooKeeperServer not running"); 12 } 13 incomingBuffer = ByteBuffer.allocate(len); 14 return true; 15 } 16 private boolean checkFourLetterWord(final SelectionKey k, final int len) 17 throws IOException 18 { 19 //获取命令 20 String cmd = cmd2String.get(len); 21 /** cancel the selection key to remove the socket handling 22 * from selector. This is to prevent netcat problem wherein 23 * netcat immediately closes the sending side after sending the 24 * commands and still keeps the receiving channel open. 25 * The idea is to remove the selectionkey from the selector 26 * so that the selector does not notice the closed read on the 27 * socket channel and keep the socket alive to write the data to 28 * and makes sure to close the socket after its done writing the data 29 */ 30 if (k != null) { 31 try { 32 k.cancel(); 33 } catch(Exception e) { 34 LOG.error("Error cancelling command selection key ", e); 35 } 36 } 37 //根据命令类型,执行相应内容 38 final PrintWriter pwriter = new PrintWriter( 39 new BufferedWriter(new SendBufferWriter())); 40 if (len == ruokCmd) { 41 RuokCommand ruok = new RuokCommand(pwriter); 42 ruok.start(); 43 return true; 44 } else if (len == getTraceMaskCmd) { 45 TraceMaskCommand tmask = new TraceMaskCommand(pwriter); 46 tmask.start(); 47 return true; 48 } else if (len == setTraceMaskCmd) { 49 int rc =; 50 if (rc < 0) { 51 throw new IOException("Read error"); 52 } 53 54 incomingBuffer.flip(); 55 long traceMask = incomingBuffer.getLong(); 56 ZooTrace.setTextTraceLevel(traceMask); 57 SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask); 58 setMask.start(); 59 return true; 60 } else if (len == enviCmd) { 61 EnvCommand env = new EnvCommand(pwriter); 62 env.start(); 63 return true; 64 } else if (len == confCmd) { 65 ConfCommand ccmd = new ConfCommand(pwriter); 66 ccmd.start(); 67 return true; 68 } else if (len == srstCmd) { 69 StatResetCommand strst = new StatResetCommand(pwriter); 70 strst.start(); 71 return true; 72 } else if (len == crstCmd) { 73 CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter); 74 crst.start(); 75 return true; 76 } else if (len == dumpCmd) { 77 DumpCommand dump = new DumpCommand(pwriter); 78 dump.start(); 79 return true; 80 } else if (len == statCmd || len == srvrCmd) { 81 StatCommand stat = new StatCommand(pwriter, len); 82 stat.start(); 83 return true; 84 } else if (len == consCmd) { 85 ConsCommand cons = new ConsCommand(pwriter); 86 cons.start(); 87 return true; 88 } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) { 89 WatchCommand wcmd = new WatchCommand(pwriter, len); 90 wcmd.start(); 91 return true; 92 } 93 return false; 94 }

1 private void readRequest() throws IOException { 2 //反序列化请求数据 3 InputStream bais = new ByteBufferInputStream(incomingBuffer); 4 BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); 5 RequestHeader h = new RequestHeader(); 6 h.deserialize(bia, "header"); 7 incomingBuffer = incomingBuffer.slice(); 8 if (h.getType() == OpCode.auth) { 9 //如果是认证请求 10 AuthPacket authPacket = new AuthPacket(); 11 ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket); 12 String scheme = authPacket.getScheme(); 13 //进行认证 14 AuthenticationProvider ap = ProviderRegistry.getProvider(scheme); 15 if (ap == null 16 || (ap.handleAuthentication(this, authPacket.getAuth()) 17 != KeeperException.Code.OK)) { 18 // 认证失败,返回失败内容 19 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, 20 KeeperException.Code.AUTHFAILED.intValue()); 21 sendResponse(rh, null, null); 22 //关闭链接 23 sendCloseSession(); 24 disableRecv(); 25 } else { 26 //认证成功,返回成功内容 27 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, 28 KeeperException.Code.OK.intValue()); 29 sendResponse(rh, null, null); 30 } 31 return; 32 } else { 33 //如果是请求,提交到zk处理 34 Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo); 35 si.setOwner(; 36 zk.submitRequest(si); 37 } 38 } 39 40 private void readConnectRequest() throws IOException, InterruptedException { 41 //反序列化链接请求对象 42 BinaryInputArchive bia = BinaryInputArchive 43 .getArchive(new ByteBufferInputStream(incomingBuffer)); 44 ConnectRequest connReq = new ConnectRequest(); 45 connReq.deserialize(bia, "connect"); 46 if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) { 47 throw new CloseRequestException(msg); 48 } 49 sessionTimeout = connReq.getTimeOut(); 50 byte passwd[] = connReq.getPasswd(); 51 //初始化session 52 disableRecv(); 53 if (connReq.getSessionId() != 0) { 54 long clientSessionId = connReq.getSessionId(); 55 factory.closeSessionWithoutWakeup(clientSessionId); 56 setSessionId(clientSessionId); 57 zk.reopenSession(this, sessionId, passwd, sessionTimeout); 58 } else { 59 zk.createSession(this, passwd, sessionTimeout); 60 } 61 initialized = true; 62 }

1 synchronized public void sendResponse(ReplyHeader h, Record r, String tag) { 2 try { 3 //序列化返回结果, 4 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 5 BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); 6 try { 7 baos.write(fourBytes); 8 bos.writeRecord(h, "header"); 9 if (r != null) { 10 bos.writeRecord(r, tag); 11 } 12 baos.close(); 13 } catch (IOException e) { 14 LOG.error("Error serializing response"); 15 } 16 //写入数据长度 17 byte b[] = baos.toByteArray(); 18 ByteBuffer bb = ByteBuffer.wrap(b); 19 bb.putInt(b.length - 4).rewind(); 20 // 21 sendBuffer(bb); 22 } catch(Exception e) { 23 LOG.warn("Unexpected exception. Destruction averted.", e); 24 } 25 } 26 void sendBuffer(ByteBuffer bb) { 27 try { 28 if (bb != closeConn) { 29 //直接发送数据 30 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { 31 try { 32 sock.write(bb); 33 } catch (IOException e) { 34 // we are just doing best effort right now 35 } 36 } 37 if (bb.remaining() == 0) { 38 packetSent(); 39 return; 40 } 41 } 42 //写入缓存中。 43 synchronized(this.factory){ 44 sk.selector().wakeup(); 45 outgoingBuffers.add(bb); 46 if (sk.isValid()) { 47 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); 48 } 49 } 50 51 } catch(Exception e) { 52 LOG.error("Unexpected Exception: ", e); 53 } 54 }

1 class EventThread extends Thread { 2 private final LinkedBlockingQueue<Object> waitingEvents = 3 new LinkedBlockingQueue<Object>(); 4 5 /** This is really the queued session state until the event 6 * thread actually processes the event and hands it to the watcher. 7 * But for all intents and purposes this is the state. 8 */ 9 private volatile KeeperState sessionState = KeeperState.Disconnected; 10 11 private volatile boolean wasKilled = false; 12 private volatile boolean isRunning = false; 13 14 //添加响应事件到队列中 15 public void queueEvent(WatchedEvent event) { 16 WatcherSetEventPair pair = new WatcherSetEventPair( 17 watcher.materialize(event.getState(), event.getType(), 18 event.getPath()), 19 event); 20 // queue the pair (watch set & event) for later processing 21 waitingEvents.add(pair); 22 } 23 //添加数据包到队列中 24 public void queuePacket(Packet packet) { 25 if (wasKilled) { 26 synchronized (waitingEvents) { 27 if (isRunning) waitingEvents.add(packet); 28 else processEvent(packet); 29 } 30 } else { 31 waitingEvents.add(packet); 32 } 33 } 34 //添加结束事件 35 public void queueEventOfDeath() { 36 waitingEvents.add(eventOfDeath); 37 } 38 39 @Override 40 public void run() { 41 try { 42 while (true) { 43 // 获取响应事件 44 Object event = waitingEvents.take(); 45 if (event == eventOfDeath) { 46 wasKilled = true; 47 } else { 48 //处理响应 49 processEvent(event); 50 } 51 if (wasKilled) 52 synchronized (waitingEvents) { 53 if (waitingEvents.isEmpty()) { 54 isRunning = false; 55 break; 56 } 57 } 58 } 59 } catch (InterruptedException e) { 60 LOG.error("Event thread exiting due to interruption", e); 61 } 62"EventThread shut down"); 63 } 64 65 private void processEvent(Object event) { 66 try { 67 if (event instanceof WatcherSetEventPair) { 68 // each watcher will process the event 69 WatcherSetEventPair pair = (WatcherSetEventPair) event; 70 for (Watcher watcher : pair.watchers) { 71 try { 72 watcher.process(pair.event); 73 } catch (Throwable t) { 74 LOG.error("Error while calling watcher ", t); 75 } 76 } 77 } else { 78 Packet p = (Packet) event; 79 int rc = 0; 80 String clientPath = p.clientPath; 81 if (p.replyHeader.getErr() != 0) { 82 rc = p.replyHeader.getErr(); 83 } 84 if (p.cb == null) { 85 LOG.warn("Somehow a null cb got to EventThread!"); 86 } else if (p.response instanceof ExistsResponse 87 || p.response instanceof SetDataResponse 88 || p.response instanceof SetACLResponse) { 89 StatCallback cb = (StatCallback) p.cb; 90 if (rc == 0) { 91 if (p.response instanceof ExistsResponse) { 92 cb.processResult(rc, clientPath, p.ctx, 93 ((ExistsResponse) p.response) 94 .getStat()); 95 } else if (p.response instanceof SetDataResponse) { 96 cb.processResult(rc, clientPath, p.ctx, 97 ((SetDataResponse) p.response) 98 .getStat()); 99 } else if (p.response instanceof SetACLResponse) { 100 cb.processResult(rc, clientPath, p.ctx, 101 ((SetACLResponse) p.response) 102 .getStat()); 103 } 104 } else { 105 cb.processResult(rc, clientPath, p.ctx, null); 106 } 107 } else if (p.response instanceof GetDataResponse) { 108 DataCallback cb = (DataCallback) p.cb; 109 GetDataResponse rsp = (GetDataResponse) p.response; 110 if (rc == 0) { 111 cb.processResult(rc, clientPath, p.ctx, rsp 112 .getData(), rsp.getStat()); 113 } else { 114 cb.processResult(rc, clientPath, p.ctx, null, 115 null); 116 } 117 } else if (p.response instanceof GetACLResponse) { 118 ACLCallback cb = (ACLCallback) p.cb; 119 GetACLResponse rsp = (GetACLResponse) p.response; 120 if (rc == 0) { 121 cb.processResult(rc, clientPath, p.ctx, rsp 122 .getAcl(), rsp.getStat()); 123 } else { 124 cb.processResult(rc, clientPath, p.ctx, null, 125 null); 126 } 127 } else if (p.response instanceof GetChildrenResponse) { 128 ChildrenCallback cb = (ChildrenCallback) p.cb; 129 GetChildrenResponse rsp = (GetChildrenResponse) p.response; 130 if (rc == 0) { 131 cb.processResult(rc, clientPath, p.ctx, rsp 132 .getChildren()); 133 } else { 134 cb.processResult(rc, clientPath, p.ctx, null); 135 } 136 } else if (p.response instanceof GetChildren2Response) { 137 Children2Callback cb = (Children2Callback) p.cb; 138 GetChildren2Response rsp = (GetChildren2Response) p.response; 139 if (rc == 0) { 140 cb.processResult(rc, clientPath, p.ctx, rsp 141 .getChildren(), rsp.getStat()); 142 } else { 143 cb.processResult(rc, clientPath, p.ctx, null, null); 144 } 145 } else if (p.response instanceof CreateResponse) { 146 StringCallback cb = (StringCallback) p.cb; 147 CreateResponse rsp = (CreateResponse) p.response; 148 if (rc == 0) { 149 cb.processResult(rc, clientPath, p.ctx, 150 (chrootPath == null 151 ? rsp.getPath() 152 : rsp.getPath() 153 .substring(chrootPath.length()))); 154 } else { 155 cb.processResult(rc, clientPath, p.ctx, null); 156 } 157 } else if (p.cb instanceof VoidCallback) { 158 VoidCallback cb = (VoidCallback) p.cb; 159 cb.processResult(rc, clientPath, p.ctx); 160 } 161 } 162 } catch (Throwable t) { 163 LOG.error("Caught unexpected throwable", t); 164 } 165 } 166 }