网络I/O模型--04非阻塞模式(解除accept() read()方法阻塞)的基础上加入多线程技术

Posted ParamousGIS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络I/O模型--04非阻塞模式(解除accept() read()方法阻塞)的基础上加入多线程技术相关的知识,希望对你有一定的参考价值。


      由于应用程序级别并没有使用多线程技术,这就导致了应用程序只能一个一个地对Socket 套接字进行处理。这个 Socket 套接宇没有处理完,就没法处理下一个 Socket 套接字 。针对这个 问题还是可以进行改进的:让应用程序层面上各个 Socket 套接字的处理相互不影响 。

服务端代码

package testBlockSocket;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

//通过加入线程的概念,让 Socket Server 能够在应用层面
//通过非阻塞的方式同时处理多个 Socket 套接字
public class SocketServer4AllTimeoutAndMultiThread {
    private final static Logger LOGGER = LoggerFactory.getLogger(SocketServer4AllTimeoutAndMultiThread.class);
    private static Object xWait = new Object();

    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8888);
        serverSocket.setSoTimeout(100);
        try {
            while (true) {
                Socket socket = null;
                try {
                    socket = serverSocket.accept();
                } catch (SocketTimeoutException el) {
                    // ===================
                    // 执行到这里,说明本次 accept()方法没有接收到任何 TCP 连接主线程在这里就可以做一些事情,记为 x
                    // ==================
                    synchronized (SocketServer4AllTimeoutAndMultiThread.xWait) {
                        LOGGER.info("这次没有接收到 TCP 连接,等待10 毫秒,模拟事件x 的处理时间 ");
                        SocketServer4AllTimeoutAndMultiThread.xWait.wait(10);
                    }
                    continue;
                }
                // 业务处理过程可以交给一个线程(这里可以使用线程池)
                // 注意 , 线程的创建过程是很耗资源和时间的

                // 最终改变不了 accept ( )方法只能一个一个地接收 Socket 连接的情况
                SocketServerThreadWithTimeOut socketServerThread = new SocketServerThreadWithTimeOut(socket);
                new Thread(socketServerThread).start();
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        } finally {
            if (serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

// 当然,接收到客户端的 Socket 后,业务的处理过程可以交给一个线程来做
class SocketServerThreadWithTimeOut implements Runnable {

    private final static Logger LOGGER = LoggerFactory.getLogger(SocketServerThreadWithTimeOut.class);
    private Socket socket;

    public SocketServerThreadWithTimeOut(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream inputStream = null;
        OutputStream outputStream = null;
        try {
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 2048;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            StringBuffer message = new StringBuffer();
            // 下面我们收取信息
            this.socket.setSoTimeout(10);
            BIORead: while (true) {
                try {
                    while ((realLen = inputStream.read(contextBytes, 0, maxLen)) != -1) {
                        message.append(new String(contextBytes, 0, realLen));
                        // 我们同样假设读取到"over"关键字,表示业务内容传输完成
                        if (message.indexOf("over") != -1) {
                            break BIORead;
                        }
                    }
                } catch (SocketTimeoutException e2) {
                    // =================
                    // 执行到这里,说明本次 read ()方法没有接收到任何数据流主线程在这里又可以做一些事情,记为 Y
                    // =================
                    LOGGER.info("这次没有接收到任务数据报文,等待 10 ~盖秒 ,模拟事件 Y 的处理时间 ");
                    continue;
                }
            }
            // 下面打印信息
            Long threadId = Thread.currentThread().getId();
            SocketServerThreadWithTimeOut.LOGGER.info("服务器(线程 :" + threadId + ")收到来自于端口 :" + sourcePort + "的信息: " + message);
            // 下面开始发送信息
            outputStream.write("回发响应信息:".getBytes());
            // 关闭 in 和 out 对象
            inputStream.close();
            outputStream.close();
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}


引入了多线程技术后, I/O的处理吞吐量有了提高(实际上并不显著) ,因为 acceptO方法为“同步”工作的情况依然存在 。

以上是关于网络I/O模型--04非阻塞模式(解除accept() read()方法阻塞)的基础上加入多线程技术的主要内容,如果未能解决你的问题,请参考以下文章

I/O模型

网络I/O模型---同步异步阻塞非阻塞之惑

聊聊阻塞与非阻塞同步与异步I/O模型

简明网络I/O模型---同步异步阻塞非阻塞之惑

简明网络I/O模型---同步异步阻塞非阻塞之惑

五种网络IO模型-阻塞I/O非阻塞I/OI/O多路复用信号驱动I/O异步I/O