TCP_AIO_Server_ZC_01
Posted JavaSkill
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TCP_AIO_Server_ZC_01相关的知识,希望对你有一定的参考价值。
ZC: 这个例子是,1个skt 投递 1个未决的接收操作
1、package aio.Client;
1.1、这是一个 测试的客户端的代码:
package aio.Client; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Future; public class TaioClient { // http://yunhaifeiwu.iteye.com/blog/1714664 public static void main(String[] args) throws Exception { AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); Future<Void> futureConn = client.connect(new InetSocketAddress("localhost", 9888)); futureConn.get(); // Future<?>.get();等待异步事件的完成 Future<Integer> futureWrite = client.write(ByteBuffer.wrap("testAA".getBytes())); int iWritten = futureWrite.get(); System.out.println("Client send ["+iWritten+"] bytes ."); ByteBuffer bufRead = ByteBuffer.allocate(256); Future<Integer> futureRead = client.read(bufRead); int iRead = futureRead.get(); System.out.println("Client recv ["+iRead+"] bytes : "+bufRead); System.out.println("\t "+bufRead.capacity()); //byte[] bytesRead = new byte[iRead+1]; byte[] bytesRead = new byte[256]; bufRead.position(0); // ZC: 我擦,这步操作,搞死我了(一直没想到会需要这步操作...) bufRead.get(bytesRead, 0, iRead); bytesRead[iRead] = (byte)0; System.out.println("\t "+new String(bytesRead)); System.out.println("\t "+new String(bytesRead, 0, iRead)); bufRead.put(iRead, (byte)0); System.out.println("\t "+new String(bufRead.array(), 0, iRead)); Thread.sleep(1000*2); } }
2、package aio.Server;
2.1、
package aio.Server; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.SocketOption; import java.nio.*; import java.nio.channels.*; import java.util.concurrent.*; import aio.Server.CompletionHandler.TaioAcceptHandlerSrv; import aio.Server.CompletionHandler.TaioReadHandlerSrv; import aio.Server.CompletionHandler.TaioWriteHandlerSrv; import aio.Server.RemoteClients.TremoteClient; import aio.Server.RemoteClients.TremoteClients; import java.net.StandardSocketOptions; public class Taioserver { public final static int PORT = 9888; public boolean FbStarted = false; AsynchronousChannelGroup FasynchronousChannelGroup = null; int FiThreadPoolSize = 100; // ZC: 这个参数的含义,不太确定是什么意思... AsynchronousServerSocketChannel FserverSocketChannel = null; int FiBacklog = 100; // ZC: 这个参数的含义,不确定是否就是 等待accept的最大数量(相当于Windows中开的accpet的线程数) TremoteClients Fclients = null; public TaioServer() throws Exception { FasynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), FiThreadPoolSize); FserverSocketChannel = AsynchronousServerSocketChannel.open(FasynchronousChannelGroup); // 通过SocketOption类设置一些TCP选项 FserverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR,true); FserverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16*1024); FserverSocketChannel.bind(new InetSocketAddress("localhost", PORT), FiBacklog); FbStarted = true; // *** Fclients = new TremoteClients(); } public void PendingAccept() { if (FbStarted && FserverSocketChannel.isOpen()) { FserverSocketChannel.accept(Fclients, new TaioAcceptHandlerSrv(this)); } else { throw new IllegalStateException("Controller has been closed"); } } public void PendingRead(TremoteClient _client) { if (FbStarted && FserverSocketChannel.isOpen()) { _client.Fskt.read(ByteBuffer.wrap(_client.FbufRecv), _client, new TaioReadHandlerSrv(this)); } else { throw new IllegalStateException("Controller has been closed"); } } // 投递 带TimeOut的 接收操作 public void PendingRead_timeout(TremoteClient _client) { if (FbStarted && FserverSocketChannel.isOpen()) { _client.Fskt.read(ByteBuffer.wrap(_client.FbufRecv), 1, TimeUnit.MILLISECONDS, _client, new TaioReadHandlerSrv(this)); } else { throw new IllegalStateException("Controller has been closed"); } } public void PendingWrite(TremoteClient _client) { if (FbStarted && FserverSocketChannel.isOpen()) { int iLen = _client.FbufSend.length - _client.FiHasSendLen; _client.Fskt.write(ByteBuffer.wrap(_client.FbufSend, _client.FiHasSendLen, iLen), _client, new TaioWriteHandlerSrv(this)); } else { throw new IllegalStateException("Controller has been closed"); } } public void Send(byte[] _buf, TremoteClient _client) { _client.FbufSend = _buf; _client.FiHasSendLen = 0; PendingWrite(_client); } // *** 反射,提供函数调用 public static void PendingAccept_r(Object _obj) { try { Class<?> clazz = TaioServer.class; Method method = clazz.getMethod("PendingAccept"); method.invoke(_obj); } catch (Exception e) { e.printStackTrace(); } } // 投递接收操作的 2个函数的函数名 public static final String METHODNAME_PEND_READ = "PendingRead"; public static final String METHODNAME_PEND_READ_TIMEOUT = "PendingRead_timeout"; public static final String METHODNAME_PEND_WRITE = "PendingWrite"; public static void PendingReadWrite_r(String _strMethodName, Object _obj, TremoteClient _client) { Class<?> clazz = TaioServer.class; try { Method method = clazz.getMethod(_strMethodName, TremoteClient.class); method.invoke(_obj, _client); } catch (Exception e) { e.printStackTrace(); } } public static void Send_r(Object _obj, byte[] _buf, TremoteClient _client) { try { Class<?> clazz = TaioServer.class; Method method = clazz.getMethod("Send", byte[].class, TremoteClient.class); //method.invoke(_obj, new Object[]{_buf}, _client); method.invoke(_obj, _buf, _client); } catch (Exception e) { e.printStackTrace(); } } // *** *** *** *** *** *** *** *** *** *** *** public static void main(String args[]) throws Exception { System.out.println("main in <<=="); new TaioServer().PendingAccept(); while (true) { System.out.println("main thread"); Thread.sleep(1000); } //System.out.println("main out ==>>"); } }
3、package aio.Server.CompletionHandler;
3.1、
package aio.Server.CompletionHandler; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import aio.Server.TaioServer; import aio.Server.RemoteClients.TremoteClient; import aio.Server.RemoteClients.TremoteClients; public class TaioAcceptHandlerSrv implements CompletionHandler<AsynchronousSocketChannel, TremoteClients> { Object FaioServer = null; Object Fclients = null; public TaioAcceptHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(AsynchronousSocketChannel _rstNewSkt, TremoteClients _attachment) { try { System.out.println("Accept connection from " + _rstNewSkt.getRemoteAddress()); System.out.println("TaioAcceptHandler.completed - 1 : "+_attachment); TremoteClient client = _attachment.ClientNew(); client.Fskt = _rstNewSkt; System.out.println("TaioAcceptHandler.completed - 2"); TaioServer.PendingReadWrite_r("PendingRead", FaioServer, client); System.out.println("TaioAcceptHandler.completed - 3"); } catch (Exception ex) { ex.printStackTrace(); } finally { TaioServer.PendingAccept_r(FaioServer); } } @Override public void failed(Throwable exc, TremoteClients _attachment) { System.out.println("Accept error" + exc); TaioServer.PendingAccept_r(FaioServer); } }
3.2、
package aio.Server.CompletionHandler; import java.io.IOException; import java.lang.reflect.Method; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import aio.Server.TaioServer; import aio.Server.RemoteClients.TremoteClient; public class TaioReadHandlerSrv implements CompletionHandler<Integer, TremoteClient> { public static void main(String[] args) { } Object FaioServer = null; public TaioReadHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(Integer _iRst, TremoteClient _attachment) { System.out.println("TaioReadHandler.completed - _iRst : "+_iRst); if (_iRst < 0) { System.out.println("对方关闭socket"); } else if (_iRst > 0) { if (_iRst < _attachment.FbufRecv.length) { // some 操作 ...(业务逻辑) byte[] bytes = new byte[1]; bytes[0] = ‘A‘; TaioServer.Send_r(FaioServer, bytes, _attachment); TaioServer.PendingReadWrite_r(TaioServer.METHODNAME_PEND_READ, FaioServer, _attachment); } else// if (_iRst == _attachment.FbytesRecvBuf.length) { // ZC: 这里,接收的数据长度正好等于缓冲区的长度==>无法判断是否接收完毕了==>就使用“投递超时接收操作”的方式来做后续处理和判断 // ZC: 我记得WindowsIOCP的处理方式就是这样的,具体以后再说 TaioServer.PendingReadWrite_r(TaioServer.METHODNAME_PEND_READ_TIMEOUT, FaioServer, _attachment); } } } @Override public void failed(Throwable _ex, TremoteClient _attachment) { try { Class<?> clazz = Class.forName("java.nio.channels.InterruptedByTimeoutException"); if (_ex.getClass() == clazz) { // some 操作 ...(业务逻辑) TaioServer.PendingReadWrite_r(TaioServer.METHODNAME_PEND_READ, FaioServer, _attachment); } else { System.out.println("TaioReadHandler.failed : "+_ex.getClass()); _ex.printStackTrace(); _attachment.Fskt.close(); } } catch (Exception e1) { e1.printStackTrace(); } } }
3.3、
package aio.Server.CompletionHandler; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import aio.Server.TaioServer; import aio.Server.RemoteClients.TremoteClient; public class TaioWriteHandlerSrv implements CompletionHandler<Integer, TremoteClient> { Object FaioServer = null; public TaioWriteHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(Integer _iRst, TremoteClient _attachment) { _attachment.FiHasSendLen += _iRst; if (_attachment.FiHasSendLen > _attachment.FbufSend.length) { System.out.println("TaioWriteHandlerSrv.completed FiSendLen > FbufSend.length"); } else if (_attachment.FiHasSendLen == _attachment.FbufSend.length) { System.out.println("TaioWriteHandlerSrv.completed 发送完毕"); } else { TaioServer.PendingReadWrite_r(TaioServer.METHODNAME_PEND_WRITE, FaioServer, _attachment); } } @Override public void failed(Throwable _ex, TremoteClient _attachment) { } }
4、package aio.Server.RemoteClients;
4.1、
package aio.Server.RemoteClients; import java.nio.channels.AsynchronousSocketChannel; public class TremoteClient { public static void main(String[] args) { } // *** // 当需要 投递多个接收操作的时候,可以将接收缓冲封装成类,然后再投递多个接收缓冲(模仿IOCP的做法) // ZC: 疑问:多个接收缓冲接收到数据后,如何确定它们在整个数据包中的先后顺序?? // ZC: ∴ 暂时,先 一个skt对应一个接收缓冲区吧... // ZC: 基本的解决思路:“虽然使用I/O完成端口的操作总会按照它们被提交的顺序完成,但是线程调度问题可能会导致关联到完成端口的实际工作不按正常顺序完成。...”(来自<<Windows网络与通信程序设计>>) // ZC: ∵IOCP的操作是顺序完成的,且会产生乱序问题的是线程调度,∴对投递多个"未决接收操作"进行同步编号就OK了。 // ZC: 对于上面书上的说法,2个顾虑: // ZC: (1)、IOCP的实现是否真是这样(IOCP顺序完成)?有MSDN的官方资料吗? // ZC: (2)、Linux中,一个skt投递多个未决接收操作,也可以使用同样的思路吗?现在查到资料Linux未实现真正的网络异步IO(用epoll模拟),以后实现了的话 会是怎么样的情况? public AsynchronousSocketChannel Fskt = null; public byte[] FbufRecv = null; // 每次最多发送多少byte的数据 // ZC: 这个值得取值,还有待商榷(评测),以后再说吧... //public final int SEND_LEN = 1024; public byte[] FbufSend = null; public int FiHasSendLen = 0; // 已经发送了多少byte的数据了 public TremoteClient(int _iRecvBufLen) { FbufRecv = new byte[_iRecvBufLen]; } // 接收 数据包 public synchronized void Recv() {} }
4.2、
package aio.Server.RemoteClients; import java.util.HashSet; import java.util.TreeSet; public class TremoteClients { // 该类用于 服务端 管理 远程客户端的信息 // *** *** *** //HashMap?HashTable? //Map<Object, TremoteClient> Fhash = new HashMap<Object, TremoteClient>(); HashSet<TremoteClient> Fset = new HashSet<TremoteClient>(); //TreeSet<TremoteClient> Fset = new TreeSet<TremoteClient>(); public final int RECV_BUF_LEN = 5; public TremoteClient ClientNew() { TremoteClient rst = new TremoteClient(RECV_BUF_LEN); boolean bNotAlreadyContain = ClientAdd(rst); if (bNotAlreadyContain) return rst; else { rst = null; return null; } } synchronized boolean ClientAdd(TremoteClient _client) { return Fset.add(_client); } synchronized boolean ClientDel(TremoteClient _client) { return Fset.remove(_client); } // 接收 数据包 public void Recv() { Object[] objects = null; synchronized(this) { objects = (TremoteClient[])Fset.toArray(); } for (int i=0; i<objects.length; i++) { TremoteClient client = (TremoteClient)(objects[i]); client.Recv(); } } }
5、
以上是关于TCP_AIO_Server_ZC_01的主要内容,如果未能解决你的问题,请参考以下文章
为啥 GraphQL 片段在查询中需要 __typename?