网络模型之重叠IO
Posted CPP编程客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络模型之重叠IO相关的知识,希望对你有一定的参考价值。
前面已经说过了select和Event selct模型,这两个还是比较小型的,今天来说重叠IO,这个可以支持上千个用户,当然理解起来也越来越难了点。
重叠IO模型是典型的非阻塞模型,接收数据和拷贝数据这两部分全部占用系统时间片,实现了效率最大化,只要将一个结构体投给系统它便会替我们完成耗时的处理。
首先来看如何创建一个支持重叠IO的套接字,使用的是WSASocket函数,原型如下:
1SOCKET WSASocket (
2 int af, // 地址族
3 int type, // 传输方式
4 int protocol, // 通信协议
5 LPWSAPROTOCOL_INFO lpProtocolInfo,
6 GROUP g,
7 DWORD dwFlags
8);
所以要创建一个重叠IO套接字,可以这样写:
1WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
对于send和recv也有对应的函数WSASend和WSARecv,先来看WSASend函数:
1int WSASend (
2 SOCKET s, // 套接字句柄
3 LPWSABUF lpBuffers, // 待传输数据的缓冲区
4 DWORD dwBufferCount, // lpBuffers数组的长度
5 LPDWORD lpNumberOfBytesSent, // 保存实际发送的字节数
6 DWORD dwFlags, // 设置数据传输特性
7 LPWSAOVERLAPPED lpOverlapped, // 指向一个WSAOVERLAPPED结构体,这个可用事件确认数据是否传输完毕
8 LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionROUTINE // 是一个函数指针,在传输完成的时候可以自动调用指向的函数
9);
大多参数都不难理解,主要是多了几个新的结构体,我们先来看WSABUF结构体:
1typedef struct __WSABUF {
2 u_longlen; // buffer length
3 char FAR *buf; // pointer to buffer
4} WSABUF, FAR * LPWSABUF;
再来看最后两个参数,所谓异步就是把本来需要使用程序时间片的运算交给系统去处理,这样耗费的就是系统的时间片,这样自己的程序就不会阻塞执行了。而系统把这些耗时操作处理完了还是得通知我们来做后续处理,这就得有确认数据是否处理完毕的方法了。重叠IO有两种方法可以确认数据是否传输完毕,而这两种方法就关系到这后两个参数。
第一种方法是使用内核对象的事件来确认,关于这个大家应该已经很熟悉了,从线程同步时我们就接触了这个,然后又在事件选择模型中主要使用这个来确认是否有消息要进行处理,此时重叠IO又可以使用它来确认数据是否处理完毕。现在来看看WSAOVERLAPPED结构体的原型:
1typedef struct _WSAOVERLAPPED {
2 DWORD Internal;
3 DWORD InternalHigh;
4 DWORD Offset;
5 DWORD OffsetHigh;
6 WSAEVENT hEvent;
7} WSAOVERLAPPED, FAR * LPWSAOVERLAPPED;
Internal和InternalHigh这两个参数原本是供系统内部使用的,后来开放了,Internal中保存着错误码,InternalHigh中保存着传输的字节数。其实在这两个参数未开放时,Windows专门提供了一个GetOverlappedResult函数来获取这两个信息,很多书中也使用的这个,其实直接使用这两个字段获取就好了。
在这里我们主要是使用hEvent这个参数,为了理解其它各个参数的意义,这里便说说文件中的重叠IO。
重叠IO不仅在套接字中会使用,其它设备中也有用到,套接字也是一种文件罢了,在我们使用ReadFile函数读取文件的时候,要是文件太大了也会阻塞在那里,要是观察其原型我们会发现它也提供了一个重叠IO参数:
1BOOL ReadFile(
2 HANDLE hFile, // handle of file to read
3 LPVOID lpBuffer, // pointer to buffer that receives data
4 DWORD nNumberOfBytesToRead, // number of bytes to read
5 LPDWORD lpNumberOfBytesRead, // pointer to number of bytes read
6 LPOVERLAPPED lpOverlapped // pointer to structure for data
7);
可以发现最后一个参数指向一个OVERLAPPED,当指定了重叠IO后,读取大文件时ReadFile就会把请求提交给操作系统,由操作系统来处理,读取完了再通知,我们要做的就是获取这个通知。下面来看使用重叠IO读取文件的一个小例子:
1// 保存读取字节的缓冲区
2std::shared_ptr<byte> buf(new byte[1000], std::default_delete<byte[]>());
3
4// 以重叠IO方式打开文件,注意倒数第2个参数设置了重叠IO标志
5HANDLE hFile = CreateFileW(L"movie.avi", GENERIC_ALL,
6 FILE_SHARE_READ | FILE_SHARE_WRITE,
7 0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);
8
9// 获取文件大小
10DWORD dwSize = GetFileSize(hFile, 0);
11
12DWORD dwRead;
13OVERLAPPED ovp = { 0 };
14ovp.Offset = 5; // 从偏移5的位置开始读
15ovp.OffsetHigh = 0;
16
17// 读取文件,因为是异步的IO,只是提交给操作系统了,并没有读取就返回了
18// 真正的读取操作由系统完成
19int ret = ReadFile(hFile, buf.get(), 1000, &dwRead, &ovp);
20if (ret)
21{
22 std::cout << buf.get() << std::endl;
23}
24else
25{
26 // 最后一次错误是ERROR_IO_PENDING时,代表系统还未读完
27 if (GetLastError() == ERROR_IO_PENDING)
28 {
29 // 等待读取完毕
30 WaitForSingleObject(hFile, INFINITE);
31 // 输出错误码,读取的字节数
32 std::cout << ovp.Internal << " " << ovp.InternalHigh << std::endl;
33 // 输出读取的字节
34 std::cout << buf.get() << std::endl;
35 }
36 else // 其它错误则读取出错
37 {
38 DWORD dwError = GetLastError();
39 std::cerr << "read failed! Error:" << dwError << std::endl;
40 }
41}
42
43CloseHandle(hFile); // 关闭文件句柄
从注释已经可以很清晰地理解OVERLAPPED了,这个东西其实就像是一个小纸条,我们在上面写下读取位置,交给系统,系统根据我们给的信息开始读取,再在上面记录下一些信息给我们。
现在回到之前,来说第二种方法,WSASend函数的最后一个参数是一个函数指针,我们需要声明一个Completion Routine函数,当数据操作完成时它会自动调用这个函数。这个函数的原型必须是这样的:
1void CALLBACK CompletionRoutine(
2 DWORD dwError, // 错误信息
3 DWORD cbTransferred, // 实际收发的字节数
4 LPWSAOVERLAPPED lpOverlapped, // 就是WSASend中的lpOverlapped
5 DWORD dwFlags // 标志
6);
关于更多信息我们在实例中讲解,现在来看接收函数WSARecv,原型如下:
1int WSARecv (
2 SOCKET s,
3 LPWSABUF lpBuffers,
4 DWORD dwBufferCount,
5 LPDWORD lpNumberOfBytesRecvd,
6 LPDWORD lpFlags,
7 LPWSAOVERLAPPED lpOverlapped,
8 LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionROUTINE
9);
因为已经详细说过WSASend函数了,所以这个函数已经没什么好说的了。。。
这些基本知识就讲完了,其实第一种方法和事件选择模型有相似之处,而且也有着一样的缺点,我们主要看的是第二种方法,但为了连贯完整性,我还是用第一种方法写了一份完整的例子。
首先先来定义几个结构体方便我们使用:
1typedef struct {
2 SOCKET hSockAddr[NUM_CLIENT];
3 WSAEVENT hEventAddr[NUM_CLIENT];
4}PER_HANDLE_DATA;
5
6typedef struct {
7 OVERLAPPED overlapped;
8 WSABUF wsaBuf;
9 char buf[BUF_SIZE];
10}PER_IO_DATA;
PER_HANDLE_DATA结构体用来保存连接进来的客户端和与其绑定的事件,在上篇我们说过等待事件的函数最多只能监视64个事件,所以这个NUM_CLIENT其实是个WSA_MAXIMUM_WAIT_EVENTS的一个宏定义:
1#define NUM_CLIENT WSA_MAXIMUM_WAIT_EVENTS
要想监视更多事件还是得创建线程来多次调用。
PER_IO_DATA结构体用于保存单IO数据,就是前面说的小纸条,在这个小纸条上我们保存的有重叠结构,缓冲区和接收数据的缓冲区。BUF_SIZE是缓冲区长度,这个我定义的为1024。
接下来来看使用事件的重叠IO服务器的定义:
1class COvpEventServer
2{
3public:
4 COvpEventServer(int port);
5 ~COvpEventServer();
6 void Accpet();
7
8private:
9 void InitSock();
10 void AcceptHandler(); // 处理接收请求
11 void RequestHandler(); // 处理其它请求
12 void Cleanup(int index); // 清除退出的客户端
13
14private:
15 SOCKET m_hListenSock; // 服务器监听套接字
16 SOCKADDR_IN m_listenAddr; // 监听套接字地址
17 std::shared_ptr<PER_IO_DATA> m_hIoInfo[NUM_CLIENT];
18 PER_HANDLE_DATA m_hbInfo; // 保存连接的用户和关系的事件
19 int m_nPort; // 端口号
20 int m_nNumOfClient; // 记录连接的用户数
21};
基本注释已经全部给出,主要来说这个m_hIoInfo,因为每一个连接进来的用户都得分配一个重叠结构和缓冲区以接收系统处理后的结果,所以同样声明为数组。这里使用了智能指针shared_ptr来管理内存。
现在来依次看这些函数的实现,先来看InitSock函数,这里主要是初始化套接字相关操作:
1void COvpEventServer::InitSock()
2{
3 WSADATA wsaData;
4 int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
5 assert(ret == 0);
6
7 // 创建支持重叠IO的套接字
8 m_hListenSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
9
10 // 更改socket IO选项为非0,非0即为非阻塞套接字
11 ULONG ulMode = 1;
12 ioctlsocket(m_hListenSock, FIONBIO, &ulMode);
13
14 memset(&m_listenAddr, 0, sizeof(m_listenAddr));
15 m_listenAddr.sin_family = AF_INET;
16 m_listenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
17 m_listenAddr.sin_port = htons(m_nPort);
18
19 ret = bind(m_hListenSock, (SOCKADDR *)&m_listenAddr, sizeof(m_listenAddr));
20 assert(ret != SOCKET_ERROR);
21
22 ret = listen(m_hListenSock, 5);
23 assert(ret != SOCKET_ERROR);
24
25 std::thread t(&COvpEventServer::RequestHandler, this);
26 t.detach();
27}
这里我们使用WSASocketW函数创建了一个支持重叠IO的套接字,接着把套接字设为非阻塞的,这是使用ioctsocket函数来完成的,这个函数的原型如下:
1int ioctlsocket (
2 SOCKET s,
3 long cmd,
4 u_long FAR* argp
5);
将第二个参数设为FIONBIO后,第三个参数只要是非0就会把指定的套接字设为非阻塞,设为非阻塞的套接字在调用accept这些函数时就不会阻塞了,会直接返回。
最后,使用thread开了一个线程,这个线程用于处理其它请求的,稍后再来看。
接着来看处理连接请求的AcceptHandler函数:
1void COvpEventServer::AcceptHandler()
2{
3 SOCKET hClntSock;
4 SOCKADDR_IN clntAddr;
5 int addrLen = sizeof(clntAddr);
6 while (TRUE)
7 {
8 // 接受请求,因为已经设为非阻塞套接字,所以不会阻塞
9 hClntSock = accept(m_hListenSock, (SOCKADDR *)&clntAddr, &addrLen);
10 if (hClntSock == INVALID_SOCKET)
11 {
12 // 若无连接请求,最后一次错误会设为WSAEWOULDBLOCK
13 if (WSAGetLastError() == WSAEWOULDBLOCK)
14 continue;
15 else
16 {
17 std::cerr << "accept error!" << std::endl;
18 break;
19 }
20 }
21 std::cout << "connected client..." << std::endl;
22
23 // 创建手动重置的事件
24 WSAEVENT newEvent = WSACreateEvent();
25
26 // 分配内存,设置小纸条
27 m_hIoInfo[m_nNumOfClient] = std::make_shared<PER_IO_DATA>();
28 memset(&m_hIoInfo[m_nNumOfClient]->overlapped, 0, sizeof(WSAOVERLAPPED));
29 m_hIoInfo[m_nNumOfClient]->overlapped.hEvent = newEvent;
30 m_hIoInfo[m_nNumOfClient]->wsaBuf.buf = m_hIoInfo[m_nNumOfClient]->buf;
31 m_hIoInfo[m_nNumOfClient]->wsaBuf.len = BUF_SIZE;
32
33 // 同步更新客户套接字和关联的事件
34 m_hbInfo.hSockAddr[m_nNumOfClient] = hClntSock;
35 m_hbInfo.hEventAddr[m_nNumOfClient] = newEvent;
36
37 DWORD recvBytes = 0;
38 DWORD flags = 0;
39 // 接收数据
40 WSARecv(hClntSock, &m_hIoInfo[m_nNumOfClient]->wsaBuf, 1, &recvBytes, &flags, &m_hIoInfo[m_nNumOfClient]->overlapped, NULL);
41
42 m_nNumOfClient++;
43 }
44}
在这里主要处理的是连接请求,因为套接字已经是非阻塞的了,所以accept不会阻塞,但需要加个判断。
接着设置重叠结构和缓冲区,就是所谓的填写小纸条,设置分配的事件和接收数据的缓冲区。这个小纸条是传给WSARecv函数的,因为重叠结构是和事件绑定上的,所以我们便可坐享其成,等到系统为我们处理完成以后,它会设置这个事件状态,我们只要监视这个事件状态就可以了。
其它细节便不多说了,除了几个重叠IO相关的函数,事件这些我们在事件选择模型里面已经详细说过了,若是忘了可再次参考那篇文章。
现在来看处理其它请求的RequestHandler函数:
1void COvpEventServer::RequestHandler()
2{
3 int posInfo, index;
4 while (true)
5 {
6 // 检测发生事件的索引
7 posInfo = WSAWaitForMultipleEvents(m_nNumOfClient, m_hbInfo.hEventAddr, FALSE, 1000, FALSE);
8 if (posInfo == WSA_WAIT_FAILED || posInfo == WSA_WAIT_TIMEOUT)
9 continue;
10
11 // 得到索引
12 index = posInfo - WSA_WAIT_EVENT_0;
13
14 // 手动重置事件
15 WSAResetEvent(m_hbInfo.hEventAddr[index]);
16
17 // 获取实际接收的字节数
18 DWORD dwRecvBytes = m_hIoInfo[index]->overlapped.InternalHigh;
19 if (dwRecvBytes == 0) // 字节数为0则表示客户端退出
20 {
21 std::cout << "disconnected..." << m_hbInfo.hSockAddr[index] << std::endl;
22 Cleanup(index);
23 }
24 else
25 {
26 // 输出接收的消息
27 std::cout << m_hIoInfo[index]->buf << std::endl;
28 send(m_hbInfo.hSockAddr[index], m_hIoInfo[index]->buf, dwRecvBytes, 0);
29
30 // 再次接收其它消息
31 DWORD recvBytes = 0;
32 DWORD flags = 0;
33 WSARecv(m_hbInfo.hSockAddr[index], &m_hIoInfo[index]->wsaBuf, 1,
34 &recvBytes, &flags, &m_hIoInfo[index]->overlapped, NULL);
35 }
36 }
37}
基本框架和上篇差不多,但我感觉这里其实要比事件选择模型简单清晰点的。
还是通过WSAWaitForMultipleEvents函数检测发生消息的事件位置,然后减去WSA_WAIT_EVENT_0得到最小索引。
得到索引就等于得到了对应的套接字和PER_IO_DATA,因为我们是同步更新这三个地方的。
接着通过OVERLAPPED结构体的InternalHigh字段来得到实际接收的字节数,忘了的可以看前面这个结构的说明。
字节数不为0就表示有收到数据,我们通过PER_IO_DATA的buf字段可以得到接收的数据,我们把小纸条投递上去,系统已经帮我们把这些内容都准备好了。
大家可能会说这里怎么还是使用的send函数发送数据,而非WSASend函数呢?可以想一下。
WSARecv和WSASend函数其实更多的是为Completion Routine方式使用的,也就是前面所说的第二种方法,因为我们这里需要系统帮我们填写小纸条中的数据,所以有必要使用WSARecv函数。但在发送数据的时候已经万事俱备了,数据,大小,套接字都准备好了,所以也就没必要使用WSASend函数了。
最后,再次调用一次WSARecv函数以接收其它的数据请求。当客户端退出时,收到的字节数会为0,在这里我们专门使用Cleanup函数来处理:
1void COvpEventServer::Cleanup(int index)
2{
3 closesocket(m_hbInfo.hSockAddr[index]);
4 WSACloseEvent(m_hbInfo.hEventAddr[index]);
5 m_hIoInfo[index].reset();
6 for (int i = index; i < m_nNumOfClient; ++i)
7 {
8 m_hbInfo.hSockAddr[index] = m_hbInfo.hSockAddr[index + 1];
9 m_hbInfo.hEventAddr[index] = m_hbInfo.hEventAddr[index + 1];
10 m_hIoInfo[index] = m_hIoInfo[index + 1];
11 }
12}
这个函数比较简单,在这里关闭断开连接的套接字和事件,对于引用计数,可以调用reset()重置下计数。然后重置下数组,没什么好说的。
第一种方法说完了,可以结合事件选择模型对比着看,测试就不截图了。
其实重叠IO主要用的是第2种方式,这种方式没有客户端的限制,实现起来也稍微简单点,但这篇好像已经有点长了,就放到下篇来专门说吧。
以上是关于网络模型之重叠IO的主要内容,如果未能解决你的问题,请参考以下文章