NIO 源码分析(02-2) BIO 源码分析 Socket

Posted binarylei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NIO 源码分析(02-2) BIO 源码分析 Socket相关的知识,希望对你有一定的参考价值。

NIO 源码分析(02-2) BIO 源码分析 Socket

Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

在上一篇文章中详细分析了 ServerSocket 的源码,Socket 和 ServerSocket 一样也只是一个门面模式,真正的实现也是 SocksSocketImpl,所以关于 setImpl、createImpl、new、bind、listen 都是类似的,本文重点关注其 connect 和 IO 流的读取方法。

一、BIO 最简使用姿势

//1. 连接服务器
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HOST, PORT), 0);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriterout = new PrintWriter(socket.getOutputStream(), true);

//2. 发送请求数据
out.println("客户端发送请求数据...");

//3. 接收服务端数据
String response = in.readLine();
System.out.println("Client: " + response);

ok,代码已经完成!!!下面的源码分析都会基于这个 demo。

二、connect 方法

技术图片

2.1 Socket.connect 方法

// timeout=0 表示永久阻塞,timeout>0 则指定超时时间
public void connect(SocketAddress endpoint, int timeout) throws IOException 
  
    InetSocketAddress epoint = (InetSocketAddress) endpoint;
    InetAddress addr = epoint.getAddress ();
    int port = epoint.getPort();

    // 1. 创建底层 socket 套接字
    if (!created)
        createImpl(true);

    // 2. oldImpl 默认为 false,也就是进入第一个 if 条件
    //    checkOldImpl 会判断 impl 中有没有 connect(SocketAddress address, int port) 方法
    //    来设置 oldImpl 的值
    if (!oldImpl)
        impl.connect(epoint, timeout);
    else if (timeout == 0) 
        if (epoint.isUnresolved())
            impl.connect(addr.getHostName(), port);
        else
            impl.connect(addr, port);
     else
        throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
    connected = true;
    bound = true;

总结: Socket 首先和 ServerSocket 一样调用 createImpl 创建底层 socket 对象,然后委托给 impl 完成连接操作

2.2 AbstractPlainSocketImpl.connect 方法

protected void connect(SocketAddress address, int timeout) throws IOException 
    boolean connected = false;
    try 
        InetSocketAddress addr = (InetSocketAddress) address;
        this.port = addr.getPort();
        this.address = addr.getAddress();

        connectToAddress(this.address, port, timeout);
        connected = true;
     finally 
        if (!connected) 
            close();
        
    


private void connectToAddress(InetAddress address, int port, int timeout) throws IOException 
    if (address.isAnyLocalAddress()) 
        doConnect(InetAddress.getLocalHost(), port, timeout);
     else 
        doConnect(address, port, timeout);
    

总结: connect 将连接具体由 doConnect 完成

synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException 
    synchronized (fdLock) 
        if (!closePending && (socket == null || !socket.isBound())) 
            NetHooks.beforeTcpConnect(fd, address, port);
        
    
    try 
        acquireFD();
        try 
            socketConnect(address, port, timeout);
            /* socket may have been closed during poll/select */
            synchronized (fdLock) 
                if (closePending) 
                    throw new SocketException ("Socket closed");
                
            
            if (socket != null) 
                socket.setBound();
                socket.setConnected();
            
         finally 
            releaseFD();
        
     catch (IOException e) 
        close();
        throw e;
    

2.3 DualStackPlainSocketImpl.socketConnect 方法

void socketConnect(InetAddress address, int port, int timeout)
    throws IOException 
    int nativefd = checkAndReturnNativeFD();

    if (address == null)
        throw new NullPointerException("inet address argument is null.");

    int connectResult;
    if (timeout <= 0) 
        connectResult = connect0(nativefd, address, port);
     else 
        configureBlocking(nativefd, false);
        try 
            connectResult = connect0(nativefd, address, port);
            if (connectResult == WOULDBLOCK) 
                waitForConnect(nativefd, timeout);
            
         finally 
            configureBlocking(nativefd, true);
        
    
   
    if (localport == 0)
        localport = localPort0(nativefd);
补充1:connect0 在 JVM 中的实现
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
  (JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) 
    SOCKETADDRESS sa;
    int rv;
    int sa_len = sizeof(sa);

    if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
                                 &sa_len, JNI_TRUE) != 0) 
      return -1;
    

    rv = connect(fd, (struct sockaddr *)&sa, sa_len);
    if (rv == SOCKET_ERROR) 
        int err = WSAGetLastError();
        if (err == WSAEWOULDBLOCK) 
            return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
         else if (err == WSAEADDRNOTAVAIL) 
            JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
                "connect: Address is invalid on local machine, or port is not valid on remote machine");
         else 
            NET_ThrowNew(env, err, "connect");
        
        return -1;  // return value not important.
    
    return rv;

总结: rv = connect(fd, (struct sockaddr *)&sa, sa_len) 建立远程连接。

补充2:waitForConnect 在 JVM 中的实现

和 ServerSocket.waitForNewConnection 一样,也是通过 Winsock 库的 select 函数来实现超时的功能。

JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
  (JNIEnv *env, jclass clazz, jint fd, jint timeout) 
    int rv, retry;
    int optlen = sizeof(rv);
    fd_set wr, ex;
    struct timeval t;

    FD_ZERO(&wr);
    FD_ZERO(&ex);
    FD_SET(fd, &wr);
    FD_SET(fd, &ex);
    t.tv_sec = timeout / 1000;
    t.tv_usec = (timeout % 1000) * 1000;

    rv = select(fd+1, 0, &wr, &ex, &t);
    if (rv == 0) 
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                        "connect timed out");
        shutdown( fd, SD_BOTH );
        return;
    
    if (!FD_ISSET(fd, &ex)) 
        return;         /* connection established */
    

    for (retry=0; retry<3; retry++) 
        NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
                       (char*)&rv, &optlen);
        if (rv) 
            break;
        
        Sleep(0);
    

    if (rv == 0) 
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                        "Unable to establish connection");
     else 
        NET_ThrowNew(env, rv, "connect");
    

总结: rv = select(fd+1, 0, &wr, &ex, &t) 轮询会阻塞程序。

三、SocketInputStream

3.1 构造方法

SocketInputStream(AbstractPlainSocketImpl impl) throws IOException 
    super(impl.getFileDescriptor());
    this.impl = impl;
    socket = impl.getSocket();

总结: SocketInputStream 内部实现上也是对 impl 的封装。SocketInputStream.read 其实也是调用底层 socket 的 read 方法。

3.2 read 方法

int read(byte b[], int off, int length, int timeout) throws IOException 
    int n;

    // EOF already encountered
    if (eof) 
        return -1;
    

    // connection reset
    if (impl.isConnectionReset()) 
        throw new SocketException("Connection reset");
    

    // bounds check
    if (length <= 0 || off < 0 || off + length > b.length) 
        if (length == 0) 
            return 0;
        
        throw new ArrayIndexOutOfBoundsException();
    

    boolean gotReset = false;

    // acquire file descriptor and do the read
    FileDescriptor fd = impl.acquireFD();
    try 
        n = socketRead(fd, b, off, length, timeout);
        if (n > 0) 
            return n;
        
     catch (ConnectionResetException rstExc) 
        gotReset = true;
     finally 
        impl.releaseFD();
    

    /*
     * We receive a "connection reset" but there may be bytes still
     * buffered on the socket
     */
    if (gotReset) 
        impl.setConnectionResetPending();
        impl.acquireFD();
        try 
            n = socketRead(fd, b, off, length, timeout);
            if (n > 0) 
                return n;
            
         catch (ConnectionResetException rstExc) 
         finally 
            impl.releaseFD();
        
    

    /*
     * If we get here we are at EOF, the socket has been closed,
     * or the connection has been reset.
     */
    if (impl.isClosedOrPending()) 
        throw new SocketException("Socket closed");
    
    if (impl.isConnectionResetPending()) 
        impl.setConnectionReset();
    
    if (impl.isConnectionReset()) 
        throw new SocketException("Connection reset");
    
    eof = true;
    return -1;


private int socketRead(FileDescriptor fd, byte b[], int off, int len,
        int timeout) throws IOException 
    return socketRead0(fd, b, off, len, timeout);
补充2:socketRead0 在 JVM 中的实现
// src/windows/native/java/net/SocketInputStream.c
JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
        jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) 
    char *bufP;
    char BUF[MAX_BUFFER_LEN];
    jint fd, newfd;
    jint nread;

    if (IS_NULL(fdObj)) 
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
        return -1;
    
    fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
    if (fd == -1) 
        NET_ThrowSocketException(env, "Socket closed");
        return -1;
    

    /*
     * If the caller buffer is large than our stack buffer then we allocate
     * from the heap (up to a limit). If memory is exhausted we always use
     * the stack buffer.
     */
    if (len <= MAX_BUFFER_LEN) 
        bufP = BUF;
     else 
        if (len > MAX_HEAP_BUFFER_LEN) 
            len = MAX_HEAP_BUFFER_LEN;
        
        bufP = (char *)malloc((size_t)len);
        if (bufP == NULL) 
            /* allocation failed so use stack buffer */
            bufP = BUF;
            len = MAX_BUFFER_LEN;
        
    

    if (timeout) 
        if (timeout <= 5000 || !isRcvTimeoutSupported) 
            int ret = NET_Timeout (fd, timeout);

            if (ret <= 0) 
                if (ret == 0) 
                    JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                                    "Read timed out");
                 else if (ret == JVM_IO_ERR) 
                    JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
                 else if (ret == JVM_IO_INTR) 
                    JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
                                    "Operation interrupted");
                
                if (bufP != BUF) 
                    free(bufP);
                
                return -1;
            

            /*check if the socket has been closed while we were in timeout*/
            newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
            if (newfd == -1) 
                NET_ThrowSocketException(env, "Socket Closed");
                if (bufP != BUF) 
                    free(bufP);
                
                return -1;
            
        
    

    // 最关键的代码,recv 从 socketfd 中读取数据
    nread = recv(fd, bufP, len, 0);

    if (nread > 0) 
        (*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
     else 
        if (nread < 0) 
            // Check if the socket has been closed since we last checked.
            // This could be a reason for recv failing.
            if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) 
                NET_ThrowSocketException(env, "Socket closed");
             else 
                switch (WSAGetLastError()) 
                    case WSAEINTR:
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                            "socket closed");
                        break;
                    case WSAECONNRESET:
                    case WSAESHUTDOWN:
                        /*
                         * Connection has been reset - Windows sometimes reports
                         * the reset as a shutdown error.
                         */
                        JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
                        break;
                    case WSAETIMEDOUT :
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
                        break;
                    default:
                        NET_ThrowCurrent(env, "recv failed");
                
            
        
    
    if (bufP != BUF) 
        free(bufP);
    
    return nread;

总结: socketRead0 实现很长,其实我们只用关注核心的实现 nread = recv(fd, bufP, len, 0); 即可,毕竟我们不是专门做 c++。

四、SocketInputStream

和 SocketInputStream 类似,就不继续分析了。


每天用心记录一点点。内容也许不重要,但习惯很重要!

以上是关于NIO 源码分析(02-2) BIO 源码分析 Socket的主要内容,如果未能解决你的问题,请参考以下文章

Netty学习(源码分析)

Netty学习(源码分析)

Netty学习(源码分析)

Spring GateWay 路由源码分析

Zuul2源码分析

NIO-FileChannel源码分析