NIO 源码分析(02-2) BIO 源码分析 Socket
Posted binarylei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NIO 源码分析(02-2) BIO 源码分析 Socket相关的知识,希望对你有一定的参考价值。
目录
NIO 源码分析(02-2) BIO 源码分析 Socket
Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)
在上一篇文章中详细分析了 ServerSocket 的源码,Socket 和 ServerSocket 一样也只是一个门面模式,真正的实现也是 SocksSocketImpl,所以关于 setImpl、createImpl、new、bind、listen 都是类似的,本文重点关注其 connect 和 IO 流的读取方法。
一、BIO 最简使用姿势
//1. 连接服务器
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HOST, PORT), 0);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriterout = new PrintWriter(socket.getOutputStream(), true);
//2. 发送请求数据
out.println("客户端发送请求数据...");
//3. 接收服务端数据
String response = in.readLine();
System.out.println("Client: " + response);
ok,代码已经完成!!!下面的源码分析都会基于这个 demo。
二、connect 方法
2.1 Socket.connect 方法
// timeout=0 表示永久阻塞,timeout>0 则指定超时时间
public void connect(SocketAddress endpoint, int timeout) throws IOException
InetSocketAddress epoint = (InetSocketAddress) endpoint;
InetAddress addr = epoint.getAddress ();
int port = epoint.getPort();
// 1. 创建底层 socket 套接字
if (!created)
createImpl(true);
// 2. oldImpl 默认为 false,也就是进入第一个 if 条件
// checkOldImpl 会判断 impl 中有没有 connect(SocketAddress address, int port) 方法
// 来设置 oldImpl 的值
if (!oldImpl)
impl.connect(epoint, timeout);
else if (timeout == 0)
if (epoint.isUnresolved())
impl.connect(addr.getHostName(), port);
else
impl.connect(addr, port);
else
throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
connected = true;
bound = true;
总结: Socket 首先和 ServerSocket 一样调用 createImpl 创建底层 socket 对象,然后委托给 impl 完成连接操作
2.2 AbstractPlainSocketImpl.connect 方法
protected void connect(SocketAddress address, int timeout) throws IOException
boolean connected = false;
try
InetSocketAddress addr = (InetSocketAddress) address;
this.port = addr.getPort();
this.address = addr.getAddress();
connectToAddress(this.address, port, timeout);
connected = true;
finally
if (!connected)
close();
private void connectToAddress(InetAddress address, int port, int timeout) throws IOException
if (address.isAnyLocalAddress())
doConnect(InetAddress.getLocalHost(), port, timeout);
else
doConnect(address, port, timeout);
总结: connect 将连接具体由 doConnect 完成
synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException
synchronized (fdLock)
if (!closePending && (socket == null || !socket.isBound()))
NetHooks.beforeTcpConnect(fd, address, port);
try
acquireFD();
try
socketConnect(address, port, timeout);
/* socket may have been closed during poll/select */
synchronized (fdLock)
if (closePending)
throw new SocketException ("Socket closed");
if (socket != null)
socket.setBound();
socket.setConnected();
finally
releaseFD();
catch (IOException e)
close();
throw e;
2.3 DualStackPlainSocketImpl.socketConnect 方法
void socketConnect(InetAddress address, int port, int timeout)
throws IOException
int nativefd = checkAndReturnNativeFD();
if (address == null)
throw new NullPointerException("inet address argument is null.");
int connectResult;
if (timeout <= 0)
connectResult = connect0(nativefd, address, port);
else
configureBlocking(nativefd, false);
try
connectResult = connect0(nativefd, address, port);
if (connectResult == WOULDBLOCK)
waitForConnect(nativefd, timeout);
finally
configureBlocking(nativefd, true);
if (localport == 0)
localport = localPort0(nativefd);
补充1:connect0 在 JVM 中的实现
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
(JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port)
SOCKETADDRESS sa;
int rv;
int sa_len = sizeof(sa);
if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
&sa_len, JNI_TRUE) != 0)
return -1;
rv = connect(fd, (struct sockaddr *)&sa, sa_len);
if (rv == SOCKET_ERROR)
int err = WSAGetLastError();
if (err == WSAEWOULDBLOCK)
return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
else if (err == WSAEADDRNOTAVAIL)
JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
"connect: Address is invalid on local machine, or port is not valid on remote machine");
else
NET_ThrowNew(env, err, "connect");
return -1; // return value not important.
return rv;
总结: rv = connect(fd, (struct sockaddr *)&sa, sa_len)
建立远程连接。
补充2:waitForConnect 在 JVM 中的实现
和 ServerSocket.waitForNewConnection 一样,也是通过 Winsock 库的 select 函数来实现超时的功能。
JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
(JNIEnv *env, jclass clazz, jint fd, jint timeout)
int rv, retry;
int optlen = sizeof(rv);
fd_set wr, ex;
struct timeval t;
FD_ZERO(&wr);
FD_ZERO(&ex);
FD_SET(fd, &wr);
FD_SET(fd, &ex);
t.tv_sec = timeout / 1000;
t.tv_usec = (timeout % 1000) * 1000;
rv = select(fd+1, 0, &wr, &ex, &t);
if (rv == 0)
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
"connect timed out");
shutdown( fd, SD_BOTH );
return;
if (!FD_ISSET(fd, &ex))
return; /* connection established */
for (retry=0; retry<3; retry++)
NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
(char*)&rv, &optlen);
if (rv)
break;
Sleep(0);
if (rv == 0)
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"Unable to establish connection");
else
NET_ThrowNew(env, rv, "connect");
总结: rv = select(fd+1, 0, &wr, &ex, &t)
轮询会阻塞程序。
三、SocketInputStream
3.1 构造方法
SocketInputStream(AbstractPlainSocketImpl impl) throws IOException
super(impl.getFileDescriptor());
this.impl = impl;
socket = impl.getSocket();
总结: SocketInputStream 内部实现上也是对 impl 的封装。SocketInputStream.read 其实也是调用底层 socket 的 read 方法。
3.2 read 方法
int read(byte b[], int off, int length, int timeout) throws IOException
int n;
// EOF already encountered
if (eof)
return -1;
// connection reset
if (impl.isConnectionReset())
throw new SocketException("Connection reset");
// bounds check
if (length <= 0 || off < 0 || off + length > b.length)
if (length == 0)
return 0;
throw new ArrayIndexOutOfBoundsException();
boolean gotReset = false;
// acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try
n = socketRead(fd, b, off, length, timeout);
if (n > 0)
return n;
catch (ConnectionResetException rstExc)
gotReset = true;
finally
impl.releaseFD();
/*
* We receive a "connection reset" but there may be bytes still
* buffered on the socket
*/
if (gotReset)
impl.setConnectionResetPending();
impl.acquireFD();
try
n = socketRead(fd, b, off, length, timeout);
if (n > 0)
return n;
catch (ConnectionResetException rstExc)
finally
impl.releaseFD();
/*
* If we get here we are at EOF, the socket has been closed,
* or the connection has been reset.
*/
if (impl.isClosedOrPending())
throw new SocketException("Socket closed");
if (impl.isConnectionResetPending())
impl.setConnectionReset();
if (impl.isConnectionReset())
throw new SocketException("Connection reset");
eof = true;
return -1;
private int socketRead(FileDescriptor fd, byte b[], int off, int len,
int timeout) throws IOException
return socketRead0(fd, b, off, len, timeout);
补充2:socketRead0 在 JVM 中的实现
// src/windows/native/java/net/SocketInputStream.c
JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
jobject fdObj, jbyteArray data, jint off, jint len, jint timeout)
char *bufP;
char BUF[MAX_BUFFER_LEN];
jint fd, newfd;
jint nread;
if (IS_NULL(fdObj))
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
return -1;
fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
if (fd == -1)
NET_ThrowSocketException(env, "Socket closed");
return -1;
/*
* If the caller buffer is large than our stack buffer then we allocate
* from the heap (up to a limit). If memory is exhausted we always use
* the stack buffer.
*/
if (len <= MAX_BUFFER_LEN)
bufP = BUF;
else
if (len > MAX_HEAP_BUFFER_LEN)
len = MAX_HEAP_BUFFER_LEN;
bufP = (char *)malloc((size_t)len);
if (bufP == NULL)
/* allocation failed so use stack buffer */
bufP = BUF;
len = MAX_BUFFER_LEN;
if (timeout)
if (timeout <= 5000 || !isRcvTimeoutSupported)
int ret = NET_Timeout (fd, timeout);
if (ret <= 0)
if (ret == 0)
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
"Read timed out");
else if (ret == JVM_IO_ERR)
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
else if (ret == JVM_IO_INTR)
JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
"Operation interrupted");
if (bufP != BUF)
free(bufP);
return -1;
/*check if the socket has been closed while we were in timeout*/
newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
if (newfd == -1)
NET_ThrowSocketException(env, "Socket Closed");
if (bufP != BUF)
free(bufP);
return -1;
// 最关键的代码,recv 从 socketfd 中读取数据
nread = recv(fd, bufP, len, 0);
if (nread > 0)
(*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
else
if (nread < 0)
// Check if the socket has been closed since we last checked.
// This could be a reason for recv failing.
if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1)
NET_ThrowSocketException(env, "Socket closed");
else
switch (WSAGetLastError())
case WSAEINTR:
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"socket closed");
break;
case WSAECONNRESET:
case WSAESHUTDOWN:
/*
* Connection has been reset - Windows sometimes reports
* the reset as a shutdown error.
*/
JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
break;
case WSAETIMEDOUT :
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
break;
default:
NET_ThrowCurrent(env, "recv failed");
if (bufP != BUF)
free(bufP);
return nread;
总结: socketRead0 实现很长,其实我们只用关注核心的实现 nread = recv(fd, bufP, len, 0);
即可,毕竟我们不是专门做 c++。
四、SocketInputStream
和 SocketInputStream 类似,就不继续分析了。
每天用心记录一点点。内容也许不重要,但习惯很重要!
以上是关于NIO 源码分析(02-2) BIO 源码分析 Socket的主要内容,如果未能解决你的问题,请参考以下文章