Java 网络编程 —— 实现非阻塞式的服务器

Posted YeeXang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 网络编程 —— 实现非阻塞式的服务器相关的知识,希望对你有一定的参考价值。

创建阻塞的服务器

ServerSocketChannelSockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程

public class EchoServer 
    
	private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private ExecutorService executorService; //线程池
    private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目
    
    public EchoServer() throws IOException 
        //创建一个线程池
        executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
        //创建一个ServerSocketChannel对象
        serverSocketChannel = ServerSocketChannel.open();
        //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口
        serverSocketChannel.socket().setReuseAddress(true);
        //把服务器进程与一个本地端口绑定
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        System.out.println("服务器启动");
    
    
    public void service() 
        while (true) 
            SocketChannel socketChannel = null;
            try 
                socketChannel = serverSocketChannel.accept();
                //处理客户连接
                executorService.execute(new Handler(socketChannel));
             catch(IOException e) 
                e.printStackTrace();
            
        
    
    
    public static void main(String args[])throws IOException 
        new EchoServer().service();
    
    
    //处理客户连按
    class Handler implements Runnable 

        private SocketChannel socketChannel;
		
        public Handler(SocketChannel socketChannel) 
            this.socketChannel = socketChannel;
        
        
        public void run() 
            handle(socketChannel);
        
        
        public void handle(SocketChannel socketChannel) 
            try 
                //获得与socketChannel关联的Socket对象
                Socket socket = socketChannel.socket();
                System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());
                
                BufferedReader br = getReader(socket);
                PrintWriter pw = getWriter(socket);
                
                String msg = null;
                while ((msg = br.readLine()) != null) 
                    System.out.println(msg);
                    pw.println(echo(msg));
                    if (msg.equals("bye")) 
                        break;
                    
                
             catch (IOException e) 
                e.printStackTrace();
             finally 
                try 
                    if(socketChannel != null) 
                        socketChannel.close();
                     catch (IOException e) 
                        e.printStackTrace();
                    
                
            
        
     
    
    private PrintWriter getWriter(Socket socket) throws IOException 
        OutputStream socketOut = socket.getOutputStream();
        return new PrintWriter(socketOut,true);
    
    
    private BufferedReader getReader(Socket socket) throws IOException 
        InputStream socketIn = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(socketIn));
    
    
    public String echo(String msg) 
        return "echo:" + msg;
    


创建非阻塞的服务器

在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:

  • 接收客户的连接
  • 接收客户发送的数据
  • 向客户发回响应数据

EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件

// 创建一个Selector对象
selector = Selector.open();
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket().bind(new InetSocketAddress(port));

EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:

public void service() throws IOException 
    serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
    //第1层while循环
    while(selector.select() > 0) 
        //获得Selector的selected-keys集合
        Set readyKeys = selector.selectedKeys();
        Iterator it = readyKeys.iterator();
        //第2层while循环
        while (it.hasNext()) 
            SelectionKey key = null;
            //处理SelectionKey
            try 
                //取出一个SelectionKey
                key = (SelectionKey) it.next();
                //把 SelectionKey从Selector 的selected-key 集合中删除
                it.remove();
                1f (key.isAcceptable())  处理接收连接就绪事件; 
                if (key.isReadable())  处理读就绪水件; 
                if (key.isWritable())  处理写就绪事件; 
             catch(IOException e) 
                e.printStackTrace();
                try 
                    if(key != null) 
                        //使这个SelectionKey失效
                        key.cancel();
                        //关闭与这个SelectionKey关联的SocketChannel
                        key.channel().close();
                    
                 catch(Exception ex)  
                    e.printStackTrace();
                
            
        
    

  • 首先由 ServerSocketChannelSelector 注册接收连接就绪事件,如果 Selector 监控到该事件发生,就会把相应的 SelectionKey 对象加入 selected-keys 集合
  • 第一层 while 循环,不断询问 Selector 已经发生的事件,select() 方法返回当前相关事件已经发生的 SelectionKey 的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。SelectorselectedKeys() 方法返回 selected-keys 集合,它存放了相关事件已经发生的 SelectionKey 对象
  • 第二层 while 循环,从 selected-keys 集合中依次取出每个 SelectionKey 对象并从集合中删除,,然后调用 isAcceptable()isReadable()isWritable() 方法判断到底是哪种事件发生了,从而做出相应的处理

1. 处理接收连接就绪事件

if (key.isAcceptable()) 
    //获得与SelectionKey关联的ServerSocketChannel
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    //获得与客户连接的SocketChannel
    SocketChannel socketChannel = (SocketChannel) ssc.accept();
    //把Socketchannel设置为非阻塞模式
    socketChannel.configureBlocking(false);
    //创建一个用于存放用户发送来的数据的级冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //Socketchannel向Selector注册读就绪事件和写就绪事件
    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);

2. 处理读就绪事件

public void receive(SelectionKey key) throws IOException 
    //获得与SelectionKey关联的附件
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    //获得与SelectionKey关联的Socketchannel
    SocketChannel socketChannel = (SocketChannel)key.channel();
    //创建一个ByteBuffer用于存放读到的数据
    ByteBuffer readBuff = ByteBuffer.allocate(32);
    socketChannel.read(readBuff);
    readBuff.flip();
    //把buffer的极限设为容量
    buffer.limit(buffer.capacity());
    //把readBuff中的内容拷贝到buffer
    buffer.put(readBuff);

3. 处理写就绪事件

public void send(SelectionKey key) throws IOException 
    //获得与SelectionKey关联的ByteBuffer
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    //获得与SelectionKey关联的SocketChannel
    SocketChannel socketChannel = (SocketChannel) key.channel();
    buffer.flip();
    //按照GBK编码把buffer中的字节转换为字符串
    String data = decode(buffer);
    //如果还没有读到一行数据就返回
    if(data.indexOf("\\r\\n") == -1)
        return;
    //截取一行数据
    String outputData = data.substring(0, data.indexOf("\\n") + 1);
    //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
    ByteBuffer outputBuffer = encode("echo:" + outputData);
    //输出outputBuffer的所有字节
    while(outputBuffer,hasRemaining())
        socketChannel.write(outputBuffer);
    //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
    ByteBuffer temp = encode(outputData);
    //把buffer的位置设为temp的极限
    buffer.position(temp.limit()):
    //删除buffer已经处理的数据
    buffer.compact();
    //如果已经输出了字符串“bye\\r\\n”,就使SelectionKey失效,并关闭SocketChannel
    if(outputData.equals("bye\\r\\n")) 
        key.cancel();
        socketChannel.close();
    

完整代码如下:

public class EchoServer 
    
	private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private Selector selector;
    private Charset charset = Charset.forName("GBK");

	public EchoServer() throws IOException 
        // 创建一个Selector对象
        selector = Selector.open();
        //创建一个ServerSocketChannel对象
        serverSocketChannel = ServerSocketChannel.open();
        //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
        //可以顺利绑定到相同的端口
        serverSocketChannel.socket().setReuseAddress(true);
        //使ServerSocketChannel工作于非阻塞模式
        serverSocketChannel.configureBlocking(false):
        //把服务器进程与一个本地端口绑定
        serverSocketChannelsocket().bind(new InetSocketAddress(port));
    
    
    public void service() throws IOException 
        serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
        //第1层while循环
        while(selector.select() > 0) 
            //获得Selector的selected-keys集合
            Set readyKeys = selector.selectedKeys();
            Iterator it = readyKeys.iterator();
            //第2层while循环
            while (it.hasNext()) 
                SelectionKey key = null;
                //处理SelectionKey
                try 
                    //取出一个SelectionKey
                    key = (SelectionKey) it.next();
                    //把 SelectionKey从Selector 的selected-key 集合中删除
                    it.remove();
                    1f (key.isAcceptable()) 
                         //获得与SelectionKey关联的ServerSocketChannel
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        //获得与客户连接的SocketChannel
                        SocketChannel socketChannel = (SocketChannel) ssc.accept();
                        //把Socketchannel设置为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //创建一个用于存放用户发送来的数据的级冲区
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //Socketchannel向Selector注册读就绪事件和写就绪事件
                        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
                    
                    if (key.isReadable())  receive(key); 
                    if (key.isWritable())  send(key); 
                 catch(IOException e) 
                    e.printStackTrace();
                    try 
                        if(key != null) 
                            //使这个SelectionKey失效
                            key.cancel();
                            //关闭与这个SelectionKey关联的SocketChannel
                            key.channel().close();
                        
                     catch(Exception ex)  
                        e.printStackTrace();
                    
                
            
        
    
    
    public void receive(SelectionKey key) throws IOException 
        //获得与SelectionKey关联的附件
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        //获得与SelectionKey关联的Socketchannel
        SocketChannel socketChannel = (SocketChannel)key.channel();
        //创建一个ByteBuffer用于存放读到的数据
        ByteBuffer readBuff = ByteBuffer.allocate(32);
        socketChannel.read(readBuff);
        readBuff.flip();
        //把buffer的极限设为容量
        buffer.limit(buffer.capacity());
        //把readBuff中的内容拷贝到buffer
        buffer.put(readBuff);
    
    
    public void send(SelectionKey key) throws IOException 
        //获得与SelectionKey关联的ByteBuffer
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        //获得与SelectionKey关联的SocketChannel
        SocketChannel socketChannel = (SocketChannel) key.channel();
        buffer.flip();
        //按照GBK编码把buffer中的字节转换为字符串
        String data = decode(buffer);
        //如果还没有读到一行数据就返回
        if(data.indexOf("\\r\\n") == -1)
            return;
        //截取一行数据
        String outputData = data.substring(0, data.indexOf("\\n") + 1);
        //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
        ByteBuffer outputBuffer = encode("echo:" + outputData);
        //输出outputBuffer的所有字节
        while(outputBuffer,hasRemaining())
            socketChannel.write(outputBuffer);
        //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
        ByteBuffer temp = encode(outputData);
        //把buffer的位置设为temp的极限
        buffer.position(temp.limit()):
        //删除buffer已经处理的数据
        buffer.compact();
        //如果已经输出了字符串“bye\\r\\n”,就使SelectionKey失效,并关闭SocketChannel
        if(outputData.equals("bye\\r\\n")) 
            key.cancel();
            socketChannel.close();
        
    
    
    //解码
    public String decode(ByteBuffer buffer) 
        CharBuffer charBuffer = charset.decode(buffer);
        return charBuffer.toStrinq();
    
    
    //编码
    public ByteBuffer encode(String str) 
        return charset.encode(str);
    
    
    public static void main(String args[])throws Exception 
        EchoServer server = new EchoServer();
        server.service();
    


阻塞模式与非阻塞模式混合使用

使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作

假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能

负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作

public class EchoServer 
    
	private int port = 8000;
    private ServerSocketChannel serverSocketChannel = null;
    private Selector selector = null;
    private Charset charset = Charset.forName("GBK");

	public EchoServer() throws IOException 
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReuseAddress(true);
        serverSocketChannelsocket().bind(new InetSocketAddress(port));
    
    
    public void accept() 
        while(true) 
            try 
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                synchronized(gate) 
                    selector.wakeup();
                    socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
                
             catch(IOException e) 
                e.printStackTrace();
            
        
    
    
    private Object gate=new Object();
    
    public void service() throws IOException 
        while(true) 
            synchronized(gate)
            int n = selector.select();
            if(n == 0) continue;
            Set readyKeys = selector.selectedKeys();
            Iterator it = readyKeys.iterator();
            while (it.hasNext()) 
                SelectionKey key = null;
                try 
    				it.remove();
                    if (key.isReadable()) 
                        receive(key);
                    
                    if (key.isWritable()) 
                        send(key);
                    
                 catch(IOException e) 
                    e.printStackTrace();
                    try 
                        if(key != null) 
                            key.cancel();
                            key.channel().close();
                        
                     catch(Exception ex)  e.printStackTrace(); 
                
            
        
    
    
    public void receive(SelectionKey key) throws IOException 
        ...
    
    
    public void send(SelectionKey key) throws IOException 
        ...
    
    
    public String decode(ByteBuffer buffer) 
        ...
    
    
    public ByteBuffer encode(String str) 
        ...
    
    
    public static void main(String args[])throws Exception 
        final EchoServer server = new EchoServer();
        Thread accept = new Thread() 
            public void run() 
                server.accept();
            
        ;
        accept.start();
		server.service();
    

注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁

导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去

为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然


什么是socket网络编程

参考技术A 使用socket套接字,利用TCP/IP或者UDP协议,实现几个机器之间的通信。一般使用C/S结构。
以TCP/IP为例:首先建立一个服务器,步骤如下:socket()创建一个socket,bind()绑定socket到一个端口,listen()监听端口,accept()等待客户端的连接。客户端程序:socket()创建一个socket,可以绑定也可以不绑定,然后connect()连接到服务器端。socket又分为阻塞式的和非阻塞式的。阻塞式的就是服务器端等待连接直到连接上,不然一直挂起。

以上是关于Java 网络编程 —— 实现非阻塞式的服务器的主要内容,如果未能解决你的问题,请参考以下文章

java网络编程实现客户端连接服务器端,并发送消息例子。(阻塞式的方法)

java nio学习三:NIO 的非阻塞式网络通信

Java NIO实现非阻塞式socket通信

为何现在响应式编程在业务开发微服务开发不普及

什么是socket网络编程

socket 客户端编程:非阻塞式连接,错误判断及退出重连