(填坑系列) 用aio写server与client进行通信的坑
Posted zhaohuaxishi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(填坑系列) 用aio写server与client进行通信的坑相关的知识,希望对你有一定的参考价值。
最近闲来无事,就估摸着自己写个“服务注册中心”来玩,当然因为是个人写的,所以一般都是简洁版本。
代码地址在:https://gitee.com/zhxs_code/my-service-register.git
由于在处理与网络数据时,为了性能,想到用AIO来实验,结果发现AIO整个思路与之前的BIO,NIO都不一样。导致出现一些深坑,在此记录一下。
(一) AIO写的server端与client端,只能通信一次。
上代码:
server端部分:
1 public class RegisterServer { 2 3 4 // 静态初始化 eventProcess 5 private EventProcess eventProcess=new EventProcess(); 6 7 public AsynchronousServerSocketChannel serverSocketChannel=null; 8 9 public AsynchronousServerSocketChannel getSocketChannel(){ 10 return serverSocketChannel; 11 } 12 13 RegisterServer() throws InterruptedException, ExecutionException, IOException { 14 start(); 15 } 16 17 public static void main(String[] args) throws InterruptedException, IOException, ExecutionException { 18 new RegisterServer(); 19 20 } 21 22 public void start() throws ExecutionException, InterruptedException, IOException { 23 // 创建一个对象 24 serverSocketChannel = AsynchronousServerSocketChannel.open(); 25 // 绑定端口 26 serverSocketChannel.bind(new InetSocketAddress(RegisterServerConfig.getInstance().getPort())); 27 System.out.println("--------- Register Server Started ! --------- "); 28 System.out.println("--------- Register Server bind port : [" + RegisterServerConfig.getInstance().getPort() + "] --------------"); 29 30 // 心跳检测 31 ServerSchedule.checkClientHeartBeat(); 32 serverSocketChannel.accept(this,new AcceptCompletionHandler()); 33 34 } 35 36 }
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,RegisterServer> { @Override public void completed(AsynchronousSocketChannel socketChannel, RegisterServer attachment) { // 处理下一次链接,类似链式调用 attachment.getSocketChannel().accept(attachment,this); ByteBuffer buffer=ByteBuffer.allocate(1024); socketChannel.read(buffer,buffer,new ReadCompletionHandler(socketChannel,new AnalyticalMsg())); } @Override public void failed(Throwable throwable, RegisterServer registerServer) { throwable.printStackTrace(); } }
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel socketChannel; // 业务处理函数 private BusinessFun businessFun; ReadCompletionHandler(AsynchronousSocketChannel socketChannel) { if (this.socketChannel == null) { this.socketChannel = socketChannel; } } ReadCompletionHandler(AsynchronousSocketChannel socketChannel,BusinessFun businessFun) { if (this.socketChannel == null) { this.socketChannel = socketChannel; } if(this.businessFun==null){ this.businessFun=businessFun; } } @Override public void completed(Integer result, ByteBuffer attachment) { if(result<0){ System.err.println(" ReadCompletionHandler completed() result < 0 !!!! "); return; } attachment.flip(); byte[] buffer = new byte[attachment.remaining()]; attachment.get(buffer); String content = new String(buffer, StandardCharsets.UTF_8); System.out.println("received : " + content); // 处理业务 businessFun.doSomeThing(socketChannel,content); attachment.clear(); // 处理完之后,要继续监听read,否则同一个socket只能通信一次,无法接收到之后通过socket发送的消息 // ------------- 重要 ------------------- socketChannel.read(attachment,attachment,this); } @Override public void failed(Throwable throwable, ByteBuffer attachment) { try { throwable.printStackTrace(); System.err.println(" socket cloesd : "+socketChannel.getRemoteAddress()); socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }
以上就是server端的大部分核心代码。aio提供了两种处理各种操作的方式:future , handler., 拿accept操作为例:
AsynchronousServerSocketChannel 提供了accept两种的两种api
一种返回值类型为future,另一种形参列表里传入了一个CompletionHandler。
官方的注释其实说明了两种api的特点,我归纳一下:
用future的方式,意思是实际什么时候触发并不清楚,要拿到操作结果,就要调用future.get( ) 方法,但是get( ) 是阻塞的,所以和我们的初衷“异步非阻塞”其实就相违背了,所以我没采用future的方式。
handler方式其实是一个接口,将接口当做参数传入进去,实际是要自己实现该接口,然后覆写里面的complete () , faile( ) 方法的。complete方法是当底层对accpet的准备工作做完并且成功以后就会调用complete(),反之调用fail(), 那么我们可以在覆写complete()方式时实现自己的逻辑。handler是完全异步非阻塞的,不需要像future方式那样通过调用get()方式来触发。举例来说明一下,假如有个client发送了一个accpet的请求到server端,当server端接收到这个请求之后,会自动调用handler中覆写的complete和fail来实现业务逻辑。看着好像挺神奇,其实原理是底层创建了一个默认的线程池来处理这些操作。因为有另一个线程来处理,所以handler的处理方式是异步非阻塞的,因为不会阻塞当前线程。
那么为什么client与server只能通信一次了?
只能通信一次的场景我就不贴图了,有兴趣的同学可以百度aio写server的代码,试着运行一下就知道了。
解决方案是在上面的 ReadCompletionHandler 中的 completed()方法中最后一句
@Override public void completed(Integer result, ByteBuffer attachment) { if(result<0){ System.err.println(" ReadCompletionHandler completed() result < 0 !!!! "); return; } attachment.flip(); byte[] buffer = new byte[attachment.remaining()]; attachment.get(buffer); String content = new String(buffer, StandardCharsets.UTF_8); System.out.println("received : " + content); // 处理业务 businessFun.doSomeThing(socketChannel,content); attachment.clear(); // 处理完之后,要继续监听read,否则同一个socket只能通信一次,无法接收到之后通过socket发送的消息 // ------------- 重要 ------------------- socketChannel.read(attachment,attachment,this); }
我的注释已经写的很清楚了,那么为什么要这么写的原因了,这个就和aio的底层实现有关。之前已经说过aio其实是创建了一个默认线程池来处理所有操作
看源码:
AsynchronousServerSocketChannel.open()
public static AsynchronousServerSocketChannel open() throws IOException { return open(null); }
public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group) throws IOException { AsynchronousChannelProvider provider = (group == null) ? AsynchronousChannelProvider.provider() : group.provider(); return provider.openAsynchronousServerSocketChannel(group); }
public AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(AsynchronousChannelGroup var1) throws IOException { return new UnixAsynchronousServerSocketChannelImpl(this.toPort(var1)); }
private Port toPort(AsynchronousChannelGroup var1) throws IOException { if (var1 == null) { return this.defaultEventPort(); } else if (!(var1 instanceof KQueuePort)) { throw new IllegalChannelGroupException(); } else { return (Port)var1; } }
private KQueuePort defaultEventPort() throws IOException { if (defaultPort == null) { Class var1 = BsdAsynchronousChannelProvider.class; synchronized(BsdAsynchronousChannelProvider.class) { if (defaultPort == null) { defaultPort = (new KQueuePort(this, ThreadPool.getDefault())).start(); } } } return defaultPort; }
创建一个KQueuePort对象,KQueuePort是继承的Port . KQueuePort就是 aio中对端口抽象的一种具体实现, 并且还传入了一个默认的线程池。
final class KQueuePort extends Port { private static final int MAX_KEVENTS_TO_POLL = 512; private final int kqfd = KQueue.kqueue(); private boolean closed; private final int[] sp; private final AtomicInteger wakeupCount = new AtomicInteger(); private final long address; private final ArrayBlockingQueue<KQueuePort.Event> queue; private final KQueuePort.Event NEED_TO_POLL = new KQueuePort.Event((PollableChannel)null, 0); private final KQueuePort.Event EXECUTE_TASK_OR_SHUTDOWN = new KQueuePort.Event((PollableChannel)null, 0); KQueuePort(AsynchronousChannelProvider var1, ThreadPool var2) throws IOException { super(var1, var2); int[] var3 = new int[2]; try { socketpair(var3); KQueue.keventRegister(this.kqfd, var3[0], -1, 1); } catch (IOException var5) { close0(this.kqfd); throw var5; } this.sp = var3; this.address = KQueue.allocatePollArray(512); this.queue = new ArrayBlockingQueue(512); this.queue.offer(this.NEED_TO_POLL); }
KQueuePort的构造方法 创建了一个 阻塞队列 queue, 这个队列里面存储的就是通过端口传递过来的各种事件(Event),然后会进入 start方法:
KQueuePort start() { this.startThreads(new KQueuePort.EventHandlerTask()); return this; }
protected final void startThreads(Runnable var1) {
int var2;
if (!this.isFixedThreadPool()) {
for(var2 = 0; var2 < internalThreadCount; ++var2) {
this.startInternalThread(var1);
this.threadCount.incrementAndGet();
}
}
if (this.pool.poolSize() > 0) {
var1 = this.bindToGroup(var1);
try {
for(var2 = 0; var2 < this.pool.poolSize(); ++var2) {
this.pool.executor().execute(var1);
this.threadCount.incrementAndGet();
}
} catch (RejectedExecutionException var3) {
;
}
}
上面就是判断线程池的大小,然后用线程池里的线程来处理传入的runnable,而这个runnable实际是上面生成的 EventHandlerTask , 这个EventHandlerTask 实现了Runnable接口:
private class EventHandlerTask implements Runnable { private EventHandlerTask() { } private KQueuePort.Event poll() throws IOException { try { while(true) { int var1 = KQueue.keventPoll(KQueuePort.this.kqfd, KQueuePort.this.address, 512); KQueuePort.this.fdToChannelLock.readLock().lock(); try { while(var1-- > 0) { long var2 = KQueue.getEvent(KQueuePort.this.address, var1); int var4 = KQueue.getDescriptor(var2); Object var5; if (var4 == KQueuePort.this.sp[0]) { if (KQueuePort.this.wakeupCount.decrementAndGet() == 0) { KQueuePort.drain1(KQueuePort.this.sp[0]); } if (var1 <= 0) { var5 = KQueuePort.this.EXECUTE_TASK_OR_SHUTDOWN; return (KQueuePort.Event)var5; } KQueuePort.this.queue.offer(KQueuePort.this.EXECUTE_TASK_OR_SHUTDOWN); } else { var5 = (PollableChannel)KQueuePort.this.fdToChannel.get(var4); if (var5 != null) { int var6 = KQueue.getFilter(var2); short var7 = 0; if (var6 == -1) { var7 = Net.POLLIN; } else if (var6 == -2) { var7 = Net.POLLOUT; } KQueuePort.Event var8 = new KQueuePort.Event((PollableChannel)var5, var7); if (var1 <= 0) { KQueuePort.Event var9 = var8; return var9; } KQueuePort.this.queue.offer(var8); } } } } finally { KQueuePort.this.fdToChannelLock.readLock().unlock(); } } } finally { KQueuePort.this.queue.offer(KQueuePort.this.NEED_TO_POLL); } } public void run() { GroupAndInvokeCount var1 = Invoker.getGroupAndInvokeCount(); boolean var2 = var1 != null; boolean var3 = false; int var6; while(true) { boolean var14 = false; try { label151: { var14 = true; if (var2) { var1.resetInvokeCount(); } KQueuePort.Event var4; try { var3 = false; var4 = (KQueuePort.Event)KQueuePort.this.queue.take(); if (var4 == KQueuePort.this.NEED_TO_POLL) { try { var4 = this.poll(); } catch (IOException var17) { var17.printStackTrace(); var14 = false; break label151; } } } catch (InterruptedException var18) { continue; } if (var4 == KQueuePort.this.EXECUTE_TASK_OR_SHUTDOWN) { Runnable var5 = KQueuePort.this.pollTask(); if (var5 == null) { var14 = false; break; } var3 = true; var5.run(); continue; } try { var4.channel().onEvent(var4.events(), var2); continue; } catch (Error var15) { var3 = true; throw var15; } catch (RuntimeException var16) { var3 = true; throw var16; } } } finally { if (var14) { int var8 = KQueuePort.this.threadExit(this, var3); if (var8 == 0 && KQueuePort.this.isShutdown()) { KQueuePort.this.implClose(); } } } var6 = KQueuePort.this.threadExit(this, var3); if (var6 == 0 && KQueuePort.this.isShutdown()) { KQueuePort.this.implClose(); } return; } var6 = KQueuePort.this.threadExit(this, var3); if (var6 == 0 && KQueuePort.this.isShutdown()) { KQueuePort.this.implClose(); } } }
到这里,我们终于接近了 server端与client端只通信一次的真相:就是上面的run方法与poll方法。主要就是不停的从queue从获取端口各种的event,然后从通过poll方法生成各种event的task,然后上面的线程池里线程来处理各个task,而task的实际处理逻辑是先由操作系统和AIO底层完成一些准备工作,如收发包、事件分类(accept、write、read等),然后调用CompletionHandler中的completed与fail方法来处理。
但是要注意的事,所有处理event的都是通过上面的队列queue来处理,所以,当client与server第一次通信时,queue中有我们自己定义的handler来处理task,但是每一次task处理完成之后,队列中的这个task就取出去了,下一次同样的事件触发时,是无法通过queue找到对应的handler来处理的。所以为了一直能与client通信,需要在read的handler处理完之后再一次进行read.
// 处理完之后,要继续监听read,否则同一个socket只能通信一次,无法接收到之后通过socket发送的消息 // ------------- 重要 ------------------- socketChannel.read(attachment,attachment,this);
注意: 本地测试的时候用aio写的client,如果开启太多线程模拟client与server通信。所有client的线程都会阻塞,具体是什么原因还确定。
以上是关于(填坑系列) 用aio写server与client进行通信的坑的主要内容,如果未能解决你的问题,请参考以下文章