WINSOCK.07.完成端口模型
Posted oldmao_2001
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WINSOCK.07.完成端口模型相关的知识,希望对你有一定的参考价值。
文章目录
https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-socket
基于TCP/IP的网络编程有5种模型:
SELECT模型
事件选择模型
异步选择模型
重叠IO模型
完成端口模型
这次讲第五种:完成端口模型。
完成端口模型简介
完成端口也是Windows的一种机制,它是在重叠IO模型基础上进行的优化,因此其代码和重叠IO模型非常相似。在开始讲解完成端口模型之前,先总结目前重叠IO模型的缺点:
1.对于事件通知而言:
1.1事件的处理是无序的,如果1000个客户端,处理的前后顺序是我们无法指定和管理的;
1.2在WSAWaitForMultipleEvents过程中是单个客户端方式进行循环询问,如果客户端数量较多,那么延迟较高;
1.3在1.2的基础如果改进采用多线程的方式进行维护,那么其实也是非常困难的,每个线程相当老板请来的一个工人,假如我们请10个工人,每个工人负责100个客户端,但是我们无法指定新加入的客户端均匀分配给10个工人,可能加入100个客户端,那么这100个客户端很可能都分配给工人1号,其他九个工人在摸鱼。当然,请的工人越多,花的钱也越多;
2.对于完成例程而言:
虽然相比时间通知,系统自动调用回调函数,效率较高,但是每个客户端都要有一个线程去调用回调函数,这样会导致线程数变多。
针对以上问题,最后一个终极模型的特点是:
1.模仿消息队列,(系统)创建一个通知队列,以保证事件处理的先后顺序
2.充分利用CPU性能,创建最佳数量的线程。
核与线程
单核多线程
在一个时间片(时钟周期),每个线程执行一样的时间。
假这一个时间片是1毫秒,有100个线程在单核上运行,那么每个线程在一个时间片内分到0.01毫秒执行时间,时间到,不管执行到什么位置,立刻切换下一个线程执行,这些线程就在不停的切换中执行,不会因为某个线程出现死循环而影响其他线程的执行。由于时间片很短,速度太快,让人感觉是同时运行的效果。
但是线程越多,每个线程分到的执行时间越短,也就是软件越开得多越卡。
多核多线程
多个核的多线程,达到了真正的同时运行效果。
一个8核CPU,创建8条线程,操作系统会自动把8条线程分给八个核。
还有说法是按软件为单位进行分配线程。
线程数量的优化
理论上,线程数量和CPU核数一样最好,每个线程维护的客户端为64个最好。
具体可以看任务管理器(性能页面):
上面的内核是指CPU的物理内核,是真实的核数。理论上上面的I7-10875H,最好是对应8个线程性能最好,但是CPU的厂商对CPU做了优化(虚拟化?超线程技术),每个核可以掰成两个来用,所以上面的CPU参数可以理解为:8核16线程。如果没有使用超线程技术,那么多少核就对应多少线程,例如:
当然,还有一种说法,根据实作经验,可以为CPU的理论最优线程数多分配几个线程,因为有时候线程会处于挂起状态,挂起状态的线程不占用CPU的时间片,因此多设置几个线程可以更加充分利用CPU性能。
线程小结
1.CPU一个时间片为某个软件的线程分配时间越多,软件运行速度越快,那么其他软件分配时间就会减少,运行就会卡顿。
例如:CPU一个时间片是1秒,目前共有500个线程,其中A软件有100个线程,那么A软件分配到的时间为:
1
500
×
100
=
1
5
秒
\\cfrac{1}{500}\\times100=\\cfrac{1}{5}秒
5001×100=51秒
如果我们为A软件追加500个线程,那么A软件分配到的时间为:
1
1000
×
600
=
3
5
秒
\\cfrac{1}{1000}\\times 600=\\cfrac{3}{5}秒
10001×600=53秒
这也是为什么开了某个软件,你发现系统变卡了。
2.线程越多,那么每个线程之间切换的总时间越长,因此超过某个阈值后,增加线程反而会降低效率。就好比,你和你媳妇两人配合做饭,一人洗切,一人炒,比一个人要快。但是10个人来帮忙反而更慢,厨房都塞不下。
3.具体开多少线程要根据应用和硬件来设置。
完成端口模型逻辑
1.将重叠套接字(客户端+服务器)与一个完成端口(完成端口是某一个类型的变量)绑定在一起;
2.使用AcceptEx、WSARecv、WSASend投递请求(和重叠IO模型一样的代码);
3.当系统异步完成请求,就会把通知存进一个队列,我们就叫它通知队列,该队列由操作系统系统创建,维护;
4.完成端口可以理解为这个队列的头,可通过GetQueuedCompletionStatus从队列头往外取请求,一个一个处理。
完成端口模型代码
完成端口模型代码和重叠IO模型:事件通知的代码除了中间循环等待信号那段代码之外,其他都一样。整体流程而言,前面几个步骤和基本套路一样:
1.加载网络头文件网络库
2.打开网络库
3.校验版本
4.创建SOCKET
5.绑定地址与端口
创建完成端口
将完成端口与服务器SOCKET句柄绑定
6.开始监听
创建线程
开始完成端口
下面详细讲:
创建/绑定完成端口
完成这两个功能只用一个函数,但是传的参数不一样。
完成端口这个机制还可以用在文件操作上,不是网络编程专属。
https://docs.microsoft.com/en-us/windows/win32/fileio/createiocompletionport
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads
);
创建/绑定完成端口 | 绑定完成端口 | |
---|---|---|
参数1 | INVALID_HANDLE_VALUE | 服务器SOCKET句柄 |
参数2 | NULL | 完成端口变量 |
参数3 | 0 | 再次传递服务器SOCKET句柄,也可以传递一个下标(用句柄数组下标)做编号,便于客户端绑定指定完成端口(后面要用这个编号) |
参数4 | 允许此端口上最多同时运行的线程数量,可以用GetSystemInfo获取CPU核数,也可以用0表示默认CPU的核数 | 0 |
返回值 | 成功:返回一个新的完成端口;失败:用GetLastError()获取错误码 | 成功:返回一个与服务器SOCKET句柄绑定后的完成端口变量,实际上也就是原来的完成端口;失败:用GetLastError()获取错误码 |
//以下是完成端口模型代码
garr_sockAll[gi_count] = socketServer;
garr_olpAll[gi_count].hEvent = WSACreateEvent();
gi_count++;
//创建完成端口
hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
if (hPort ==0)//出错
{
int porterr = GetLastError();
printf("创建完成端口出错,错误码是:%d\\n",porterr);
closesocket(socketServer);
WSACleanup();
return 0;
}
//绑定完成端口
HANDLE hPortret =CreateIoCompletionPort((HANDLE)socketServer,hPort,0,0);
if (hPort != hPortret)//出错
{
int portreterr = GetLastError();
printf("服务器绑定完成端口出错,错误码是:%d\\n",portreterr);
CloseHandle(hPort);
closesocket(socketServer);
WSACleanup();
return 0;
}
获取CPU核数的函数在这里:
https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/nf-sysinfoapi-getsysteminfo
信息是放在SYSTEM_INFO结构里面。
void GetSystemInfo(
LPSYSTEM_INFO lpSystemInfo
);
具体代码:
//获取CPU核数
SYSTEM_INFO systemProcessorsCount;
GetSystemInfo(&systemProcessorsCount);
int nProcessorsCount = systemProcessorsCount.dwNumberOfProcessors;
创建线程
函数介绍看:https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createthread
例子看:https://docs.microsoft.com/en-us/windows/win32/procthread/creating-threads
HANDLE CreateThread(
LPSECURITY_ATTRIBUTES lpThreadAttributes,
SIZE_T dwStackSize,
LPTHREAD_START_ROUTINE lpStartAddress,
__drv_aliasesMem LPVOID lpParameter,
DWORD dwCreationFlags,
LPDWORD lpThreadId
);
参数1:指定线程句柄是否能被继承(NULL表示不继承);指定线程的执行权限(默认是NULL)
参数2:指定线程栈大小(单位是字节byte),写0,默认为1M大小
参数3:指定线程函数地址,就是指定线程要执行的函数名字(可类别完成例程中的回调函数),使用DWORD WINAPI 函数名( LPVOID lpParam );来定义
参数4:外部给线程函数传递的参数
参数5:设置线程执行方式:
值 | 执行方式 | 备注 |
---|---|---|
0 | 立即执行 | 常用 |
CREATE_SUSPENDED | 创建后进入挂起状态,可调用ResumeThread重新启动线程 | 创建后挂起,再启动比创建后启动速度快 |
STACK_SIZE_PARAM_IS_A_RESERVATION | 设置后,参数2指栈保留大小(虚拟内存的栈大小:1M);未设置,参数2指栈提交大小(物理内存的栈大小:4KB) | 不常用 |
TIPs:
物理内存就是运行内存,通俗的说就是内存条的总大小,这个玩意一般不能让程序员操作,否则会各种报错,或者泄露重要信息。
虚拟内存是把硬盘中的一部分空间用来当做内存使用。虚拟内存在硬盘上存在的是一个文件 PAGEFILE.SYS,大小位置可以自己设置,一般不放系统盘可以提高点点性能。
ResumeThread看这里:
https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-resumethread
参数6:线程编号,用于标识线程,用传址调用获取,不想获取可以写NULL。
返回值:
成功:返回线程句柄,这个要和线程编号区别开来,前者是空指针类型,后者是无符号整型;前者是内核对象,需要使用CloseHandle进行释放。
失败:NULL,需要用GetLastError()获取错误码。
//创建线程
//创建线程数组,因为线程属于内核对象
//需要释放空间,避免泄露
HANDLE *pThread = (HANDLE *)malloc(sizeof(HANDLE)*nProcessorsCount);
for (int i = 0; i < nProcessorsCount; i++)
{
pThread[i] = CreateThread(NULL,0,TreadProc,hPort,0,NULL);
if (pThread[i] == NULL)//出错
{
int err = WSAGetLastError();//取错误码
printf("创建线程失败错误码为:%d!\\n",err);
CloseHandle(hPort);
closesocket(socketServer);//释放
WSACleanup();//清理网络库
return 0;
}
}
//释放线程
for(i = 0; i < nProcessorsCount; i++)
{
CloseHandle(pThread[i]);//关闭线程
}
free(pThread);
另外释放线程代码在点×关闭窗口事件里面也要写一份
int nProcessorsCount//CPU核数
HANDLE *pThread//线程数组
这两个变量弄成全局变量。
操作通知队列
在写线程函数之前要在主函数里面将主线程设置为阻塞,不然主函数不断的运行到后面就return了。
在创建好线程下加入以下代码:
//设置主线程进入阻塞,子线程进入工作状态
while(1)
{
//Sleep是线程挂起状态,不占用CPU时间片
Sleep(1000);
}
然后再写线程绑定函数
这里我们要使用GetQueuedCompletionStatus函数:
https://docs.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus
BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
PULONG_PTR lpCompletionKey,
LPOVERLAPPED *lpOverlapped,
DWORD dwMilliseconds
);
参数1:完成端口句柄
参数2:传址调用,发送或者接收的信息的长度
参数3:传址调用,是前面创建完成端口CreateIoCompletionPort的参数3做的标识
参数4:指向重叠结构的指针,接收重叠结构地址
参数5:等待时间,可以设置为INFINITE一直等(反正闲着也是瞎循环)
//线程绑定函数
//LPVOID lpParam代表空指针(通用类型),可以传递任何类型的指针参数
//这里是CreateThread传过来的完成端口句柄,当然也直接用全局的那个句柄也可以
DWORD WINAPI TreadProc( LPVOID lpParam)
{
HANDLE paraPort = (HANDLE)lpParam;//接收参数后转定义
DWORD lpNumberOfBytesTransferred;
ULONG lpCompletionKey;
LPOVERLAPPED *lpOverlapped;
BOOL bFlag = GetQueuedCompletionStatus(paraPort,&lpNumberOfBytesTransferred,&lpCompletionKey,lpOverlapped,INFINITE);
if(bFlag = FALSE)//失败
{
printf("获取线程状态错误码为:%d!\\n",GetLastError());
//continue;
}
return 0;
}
处理通知
接上面的内容,获得通知队列之后,开始处理队头的通知,整体逻辑如下:
//处理通知
//处理ACCEPT
if(lpCompletionKey == 0)//0表示传过来的是服务器,意味着要进行ACCEPT操作
{
}
else
{
if (lpNumberOfBytesTransferred == 0)
{
//没有收到信息,客户端下线
}
else
{
if(gc_recvbuff[0] == 0)
{
//接收buff是空,那就可以在这里send信息
}
else
{
//接收buff不为空,那么就要recv
}
}
}
accept
此时第一先要把客户端SOCKET句柄绑定到完成端口
HANDLE hPortret =CreateIoCompletionPort((HANDLE)garr_sockAll[gi_count],paraPort,gi_count,0);
if (hPort != hPortret)//出错
{
int portreterr = GetLastError();
printf("portreterr出错,错误码是:%d\\n",portreterr);
closesocket(garr_sockAll[gi_count]);
//gi_count--;
//continue;
}
第二要向新客户端投递WSARecv
//再次投递Recv,至于Send可以根据需要调用,这里不用
PostRecv(gi_count);
gi_count++;
PostAccept();
补充:PostAccept中对于立即完成连接上来的新客户端也要绑定完成端口
close
需要注意的内容在注释里面解释了,这里不啰嗦。
//没有收到信息,客户端下线
printf("没有收到信息,客户端下线!\\n");
//关闭SOCKET和重叠IO句柄
closesocket(garr_sockAll[]);
WSACloseEvent(garr_olpAll[]);
//从数组中删除对应句柄,由于这里的SOCKET句柄在创建完成端口的时候已经和数组下标进行绑定
//这里不能直接用之前的和数组最后一个元素交换的方式来删除句柄
//只能设置一个删除标记:0
garr_sockAll[] = 0;
garr_olpAll[] = 0;
//删除完毕后,数组内元素大小不能--变小
//continue;
在之前写的clear函数里面要避免二次释放:
if (garr_sockAll[i] == 0)//在完成端口绑定函数的处理通知中已经释放,不用二次释放
continue;
recv
//接收buff不为空,那么就要recv
printf("%s\\n",gc_recvbuff);//打印接收到的信息
memset(gc_recvbuff,0,sizeof(gc_recvbuff));//清空buff
PostRecv(lpCompletionKey);//再次投递recv
加循环
就是把线程绑定函数里面的主要操作加入循环里面,让其不断执行
优化
PostSend
原来的代码是参考重叠IO模型:事件通知写的,里面考虑了立即完成和稍后完成两种情况。虽然还不知道啥时候会触发那种情况,但是在完成端口模型里面不需要对这两种情况进行判断。
修改前:
//投递WSASend
//参数socketIndex是当前SOCKET数组下标
int PostSend(int socketIndex)
{
WSABUF wsabuff; //接收数据专用
wsabuff.buf = "这是重叠IO模型服务器消息~!";
wsabuff.len= MAX_SEND_LENGTH;
DWORD dwSendedLength;
DWORD dwSendFlag=0;//这里要初始化,否则有错
int iret = WSASend(garr_sockAll[socketIndex],&wsabuff,1,&dwSendedLength,dwSendFlag,&garr_olpAll[socketIndex],NULL);
printf("PostSendto:%d\\n",socketIndex);
if (iret == 0)
{
//立即完成,执行成功
printf("WSASend发送给:%d成功\\n",socketIndex);
//memset(gc_recvbuff,0,MAX_RECV_LENGTH);//清空buff
//根据情况投递send,不需要循环调用
return 0;
}
else
{
int wassenderr = WSAGetLastError();
if (wassenderr == ERROR_IO_PENDING)
{
//延迟处理
return 0;
}
else
{
printf("WSASend发送失败,错误码是:%d\\n",wassenderr);
//出错处理
return wassenderr;
}
}
}
修改后:
//投递WSASend
//参数socketIndex是当前SOCKET数组下标
int PostSend(int socketIndex)
{
WSABUF wsabuff; //接收数据专用
wsabuff.buf = "这是重叠IO模型服务器消息~!";
wsabuff.len= MAX_SEND_LENGTH;
DWORD dwSendedLength;
DWORD dwSendFlag=0;//这里要初始化,否则有错
int iret = WSASend(garr_sockAll[socketIndex],&wsabuff,1,&dwSendedLength,dwSendFlag,&garr_olpAll[socketIndex],NULL);
printf("PostSendto:%d\\n",socketIndex);
int wassenderr = WSAGetLastError();
if (wassenderr != ERROR_IO_PENDING)
{
//有错
printf("PWSASend出错,错误码是:%d\\n",wassenderr);
return 0;
}
}
PostRecv
理由同上。
修改前:
//投递WSARecv
//参数socketIndex是当前SOCKET数组下标
int PostRecv(int socketIndex)
{
WSABUF wsabuff; //接收数据专用
wsabuff.buf = gc_recvbuff;
wsabuff.len= MAX_RECV_LENGTH;
DWORD dwRecvedLength;
DWORD dwRecvFlag=0;//这里要初始化,否则有错
int iret = WSARecv(garr_sockAll[socketIndex],&wsabuff,1,&dwRecvedLength,&dwRecvFlag,&garr_olpAll[socketIndex],NULL);
printf("PostRecv from id:%d\\n",socketIndex);
if (iret == 0)
{
//立即完成,执行成功
//收取信息后返回
printf("%s\\n",wsabuff.buf);
memset(gc_recvbuff,0,MAX_RECV_LENGTH);//清空buff
//根据情况投递send
//跳3.1.1继续投递Recv
PostRecv(socketIndex);
return 0;
}
else
{
int wasrecverr = WSAGetLastError();
if (wasrecverr == ERROR_IO_PENDING)
{
//延迟处理
return 0;
}
else
{
printf("WSARecv接收失败,错误码是:%d\\n",wasrecverr);
//出错处理
return wasrecverr;
}
}
}
修改后:
//投递WSARecv
//参数socketIndex是当前SOCKET数组下标
int PostRecv(int socketIndex)
{
WSABUF wsabuff; //接收数据专用
wsabuff.buf = gc_recvbuff;
wsabuff.len= MAX_RECV_LENGTH;
DWORD dwRecvedLength;
DWORD dwRecvFlag=0;//这里要初始化,否则有错
int iret = WSARecv(garr_sockAll[socketIndex],&wsabuff,1,&dwRecvedLength,&dwRecvFlag,&garr_olpAll[socketIndex],NULL);
printf("PostRecv from id:%d\\n",socketIndex);
int wasrecverr = WSAGetLastError();
if (wasrecverr != ERROR_IO_PENDING)
{
printf("WSARecv接收失败,错误码是:%d\\n",wasrecverr);
//出错处理
return wasrecverr;
}
}
PostAccept
修改前:
//投递AcceptEx
int PostAccept()
{
//客户端句柄加到数组里面,注意gi_count++的位置
garr_sockAll[gi_count]=WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,0,WSA_FLAG_OVERLAPPED);
garr_olpAll[gi_count].hEvent = WSACreateEvent();//事件初始化
char str[1024] = {0};
DWORD dwRecvCount = 0;
//AcceptEx涉及的SOCKET句柄和重叠事件结构体都是针对服务器的
BOOL bRes = AcceptEx(garr_sockAll[0],garr_sockAll[gi_count],str,0,sizeof(struct sockaddr_in)+16,
sizeof(struct sockaddr_in)+16菜鸟 关于WSASend 函数,IO完成端口模型
使用片段时 Intellij 无法正确识别 Thymeleaf 模型变量