网络模型之Event select

Posted CPP编程客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络模型之Event select相关的知识,希望对你有一定的参考价值。

开始之前,再来看看这个图,

我们说程序阻塞的主要有两部分,一个是等待数据到来,一个是将数据从内核复制到程序缓冲区。

这个事件选择模型,其实是在select模型的基础上做了更进一步地优化,这次优化把等待数据到来的这部分变成了非阻塞。而这主要是使用WSAEventSelect函数完成的,其原型如下:

1int WSAEventSelect (
2  SOCKET s,  //套接字        
3  WSAEVENT hEventObject,  //事件对象  
4  long lNetworkEvents  //网络事件
5)
;

事件选择给每个套接字绑定了一个事件,当发生指定网络事件的时候它会把绑定的事件变为signaled状态。

这个WSAEVENT,其实和之前在windows线程同步中所说的内核对象事件是一样的,只是在这个函数中需要手动重置的事件,为了清晰方便,所以弄了个typedef,并提供了一个WSACreateEvent函数来创建手动重置的事件。

1WSAEVENT event = WSACreateEvent();

其实这个WSACreateEvent函数就相当于调用了CreateEvent(NULL, TRUE, FALSE, NULL)。

lNetworkEvents可以指定一些指定的网络事件,我们感兴趣的值如下:

1FD_READ
2FD_WRITE
3FD_ACCEPT
4FD_CONNECT
5FD_CLOSE

当将套接字和事件绑定起来后,在网络事件发生的时候,OS就把事件设为signaled状态,然后我们就可以根据事件来进行处理。

那么如何验证是否发生了事件呢?

大家可能还记得在Windows线程中所说的WaitForMultipleObjects,它用于等待多个内核对象变为signaled状态。在这里,它同样提供了一个WSAWaitForMultipleEvents函数来验证是否发生事件,原型如下:

1DWORD WSAWaitForMultipleEvents(
2  DWORD cEvents,  // 对象个数
3  const WSAEVENT FAR *lphEvents,  // 事件数组地址  
4  BOOL fWaitAll,  // 等待标志
5  DWORD dwTimeOUT,  //超时时间             
6  BOOL fAlertable                 
7)
;

前4个参数的意义和WSAForMultipleObjects相同,这最后一个参数fAlerable用于触发alertable wait状态,这个在重叠IO中会用到,现在只需填入FALSE即可。

若是超时,则会返回WAIT_TIMEOUT;否则会返回一个常量,使用这个常量减去WSA_WAIT_EVENT_0可以得到事件数组中变为signaled状态的事件索引,若有多个事件都为signaled状态,那么返回的便是最小的那个索引。知道最小索引也就知道全部的了,可以对后面的依次再调用一次WSAWaitForMultipleEvents来确认所有发生事件的索引,具体的操作可以稍后在代码中看。

因为这个函数最多可等待64个事件,所以一次就只能监视64个,但可以创建线程或扩展保存句柄的数组,然后多次调用这个函数以监视更多事件。

现在已经得到发生事件的索引了,接下来如何确认发生的事件类型呢?

这就得使用这个函数了:

1int WSAEnumNetworkEvents (
2  SOCKET s,                           
3  WSAEVENT hEventObject,              
4  LPWSANETWORKEVENTS lpNetworkEvents  
5)
;

前两个分别是发生事件的套接字和与之相连的事件,这两个参数通过上一步得到的索引便能得到了。lpNetworkEvents是个指向WSANETWORKEVENTS的结构体,用于保存发生的事件类型或错误信息,其原型如下:

1typedef struct _WSANETWORKEVENTS {
2   long lNetworkEvents;
3   int iErrorCode[FD_MAX_EVENTS];
4} WSANETWORKEVENTS, FAR * LPWSANETWORKEVENTS;

lNetworkEvents就是用于保存发生的事件类型的,比如是FD_READ事件它就为FD_READ。若是有错误,iErrorCode中就会记录,若是FD_READ, 则iErrorCode[FD_READ_BIT]中就为非0,应该加以判断。

现在,创建一个CEventSelect类来实现上述知识,

 1class CEventSelect
2{

3public:
4    CEventSelect(int port);
5    ~CEventSelect();
6    void Accept();
7
8private:
9    void InitSock();  // 初始化相关操作
10    void RequestHandler();  // 处理请求
11    void CompressSocksAndEvents(int index);  // 有客户端退出时,同步删除套接字和对应的事件
12
13private:
14    WSADATA m_wsaData;
15    SOCKET m_servSock, m_clntSock;
16    SOCKADDR_IN m_servAddr, m_clntAddr;
17    SOCKET m_hSockAddr[WSA_MAXIMUM_WAIT_EVENTS];  // 套接字数组
18    WSAEVENT m_hEventAddr[WSA_MAXIMUM_WAIT_EVENTS];  // 事件数组
19    int m_nPort;  // 端口
20    int m_nNumOfSock;  // 记录当前的套接字数量
21};

同样先从InitSock中来看,

 1void CEventSelect::InitSock()
2{
3    // 初始化套按字库
4    int ret = WSAStartup(MAKEWORD(22), &m_wsaData);
5    assert(ret == 0);
6
7    // 创建服务端套接字
8    m_servSock = socket(PF_INET, SOCK_STREAM, 0);
9    memset(&m_servAddr, 0sizeof(m_servAddr));
10    m_servAddr.sin_family = AF_INET;
11    m_servAddr.sin_addr.s_addr = htonl(ADDR_ANY);
12    m_servAddr.sin_port = htons(m_nPort);
13
14    // 绑定地址
15    ret = bind(m_servSock, (SOCKADDR *)&m_servAddr, sizeof(m_servAddr));
16    assert(ret != SOCKET_ERROR);
17
18    //监听
19    ret = listen(m_servSock, 5);
20    assert(ret != SOCKET_ERROR);
21
22    // 创建一个事件
23    WSAEVENT newEvent = WSACreateEvent();
24
25    //将服务端套接字与新建的事件绑定起来,关注FD_ACCEPT和FD_CLOSE消息
26    ret = WSAEventSelect(m_servSock, newEvent, FD_ACCEPT | FD_CLOSE);
27    assert(ret != SOCKET_ERROR);
28
29    //将套接字与事件同时保存入数组
30    m_hSockAddr[m_nNumOfSock] = m_servSock;
31    m_hEventAddr[m_nNumOfSock] = newEvent;
32    m_nNumOfSock++;  //套接字计数++
33}

主要的就是WSAEventSelect函数,这里我们创建了一个事件来和服务端的套接字绑定起来,并监听FD_ACCEPT和FD_CLOSE消息,这样就把本来占用程序时间片的accept交给了OS,由它去处理,有了消息再通知我们。此处就解决了等待数据到来的这部分阻塞,这也正是事件选择模型的改进效果。

我们把每一个套接字和与之相对应的事件同步保存到数组中,这步操作很重要,因为套接字要和事件一一对应,得能通过套接字找到事件,也能通过事件找到套接字。

同样在构造函数中去调用:

1CEventSelect::CEventSelect(int port)
2    : m_nPort(port),
3      m_nNumOfSock(0)
4{
5    InitSock();
6}

现在的工作就都剩下处理OS给我们的通知了,这也是关键所在:

 1void CEventSelect::RequestHandler()
2{
3    int posInfo, startIndex;
4    while (true)
5    {
6        // 1.验证是否发生了事件
7        posInfo = WSAWaitForMultipleEvents(m_nNumOfSock, m_hEventAddr, FALSE, WSA_INFINITE, FALSE);
8        startIndex = posInfo - WSA_WAIT_EVENT_0;  // 得到最小索引
9
10        for (int i = startIndex; i < m_nNumOfSock; ++i)
11        {
12            int sigEventIndex = WSAWaitForMultipleEvents(1, &m_hEventAddr[i], TRUE, 0, FALSE);
13            if (sigEventIndex == WSA_WAIT_FAILED || sigEventIndex == WSA_WAIT_TIMEOUT)
14            {
15                continue;
16            }
17            else
18            {
19                // 2.区分事件类型
20                WSANETWORKEVENTS netEvents;
21                WSAEnumNetworkEvents(m_hSockAddr[i], m_hEventAddr[i], &netEvents);
22                if (netEvents.lNetworkEvents & FD_ACCEPT)  // 请求连接时
23                {
24                    if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)  // Error
25                        break;
26
27                    int clntAddrSize = sizeof(m_clntAddr);
28                    m_clntSock = accept(m_servSock, (SOCKADDR *)&m_clntAddr, &clntAddrSize);
29
30                    WSAEVENT newEvent = WSACreateEvent();
31                    WSAEventSelect(m_clntSock, newEvent, FD_READ | FD_CLOSE);
32                    m_hSockAddr[m_nNumOfSock] = m_clntSock;
33                    m_hEventAddr[m_nNumOfSock] = newEvent;
34                    m_nNumOfSock++;
35                    printf("connected new client:%d \n", m_clntSock);
36                }
37
38                if (netEvents.lNetworkEvents & FD_READ)  // 接收数据时
39                {
40                    if (netEvents.iErrorCode[FD_READ_BIT] != 0)
41                        break;
42
43                    char buf[BUF_SIZE] = "";
44                    int recvLen = recv(m_hSockAddr[i], buf, sizeof(buf), 0);
45                    send(m_hSockAddr[i], buf, recvLen, 0);
46                }
47
48                if (netEvents.lNetworkEvents & FD_CLOSE)  // 断开连接时
49                {
50                    if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
51                        break;
52
53                    WSACloseEvent(m_hEventAddr[i]);
54                    closesocket(m_hSockAddr[i]);
55                    m_nNumOfSock--;
56                    printf("closed client-%d...\n", m_hSockAddr[i]);
57
58                    CompressSocksAndEvents(i);
59                }
60            }
61        }
62    }
63}

先来看第一部分验证是否发生了事件,这主要在前16行。

 1int posInfo, startIndex;
2    while (true)
3    {
4        //验证是否发生了事件
5        posInfo = WSAWaitForMultipleEvents(m_nNumOfSock, m_hEventAddr, FALSE, WSA_INFINITE, FALSE);
6        startIndex = posInfo - WSA_WAIT_EVENT_0;  //得到最小索引
7
8        for (int i = startIndex; i < m_nNumOfSock; ++i)
9        {
10            int sigEventIndex = WSAWaitForMultipleEvents(1, &m_hEventAddr[i], TRUE, 0, FALSE);
11            if (sigEventIndex == WSA_WAIT_FAILED || sigEventIndex == WSA_WAIT_TIMEOUT)
12            {
13                continue;
14            }
15.....

通过调用WSAWaitForMultipleEvents函数来等待系统通知我们,第一个参数是m_nNumOfSock,基中就保存着当前拥有的套接字数量,第二个参数是事件数组,这个的个数和m_nNumOfSock是同步的,所以也就是一样的。

此处无需等待所有的事件,只要有一个有消息了我们就返回,所以第三个参数传入FALSE。然后传入WSA_INFINITE来一直等待系统的通知。

有通知了后减去WSA_WAIT_EVENT_0便可得到对应的最小索引,我们再依次对最小索引后面的事件依次调用WSAWaitForMultipleEvents就能得到所以发生消息的事件了,只是这里我们只监听一个事件,所以第一个参数只需传入1便可。接着就是错误检验了,没啥好说的。

现在来看第二部分,得到了发生的事件索引,现在就得区分事件类型:

1WSANETWORKEVENTS netEvents;
2WSAEnumNetworkEvents(m_hSockAddr[i], m_hEventAddr[i], &netEvents);

只需给它个netEvents用来保存类型或错误,很简单。

当有客户端请求连接时,我们处理FD_ACCEPT消息:

 1if (netEvents.lNetworkEvents & FD_ACCEPT)  //请求连接时
2{
3    if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)  //Error
4        break;
5
6    int clntAddrSize = sizeof(m_clntAddr);
7    m_clntSock = accept(m_servSock, (SOCKADDR *)&m_clntAddr, &clntAddrSize);
8
9    WSAEVENT newEvent = WSACreateEvent();
10    WSAEventSelect(m_clntSock, newEvent, FD_READ | FD_CLOSE);
11    m_hSockAddr[m_nNumOfSock] = m_clntSock;
12    m_hEventAddr[m_nNumOfSock] = newEvent;
13    m_nNumOfSock++;
14    printf("connected new client:%d \n", m_clntSock);
15}

只需和对应的消息进行与操作就能检验是否发生了该消息,接着再这里我们又看到了accept函数,因为现在肯定已经有消息了,所以accept就能立即处理,而不用一直傻傻地等着了。

然后为每位连接进来的客户端也绑定上事件,关注FD_READ和FD_CLOSE消息,这样客户端发消息过来或退出我们就能知道了。

接收数据没什么特别的就不说了,现在来看退出消息:

 1if (netEvents.lNetworkEvents & FD_CLOSE)  //断开连接时
2{
3    if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
4        break;
5
6    WSACloseEvent(m_hEventAddr[i]);
7    closesocket(m_hSockAddr[i]);
8    m_nNumOfSock--;
9    printf("closed client-%d...\n", m_hSockAddr[i]);
10
11    CompressSocksAndEvents(i);
12}

在关闭了对应的事件和套接字后,要对计数进行更新,也要在数组中删除对应的事件和套接字,这个主要是在CompressSocksAndEvents函数中完成的:

1void CEventSelect::CompressSocksAndEvents(int index)
2{
3    for (int i = index; i < m_nNumOfSock; ++i)
4    {
5        m_hSockAddr[i] = m_hSockAddr[i + 1];
6        m_hEventAddr[i] = m_hEventAddr[i + 1];
7    }
8}

这里同步更新数组,这小算法没啥说的,现在就使用事件选择写好了数据库,我们在Accept函数中调用RequestHanlder以供用户使用就好。

可以看出事件选择主要是对套接字绑定一个事件,然后提供感兴趣的消息,交给OS去处理,只需经过一个短暂的阻塞(将数据从网卡缓冲区拷贝到程序的内存),就可以完成接收过程了。

这个模型稍微扩展便可以监听几百个事件,而且使用简单,对于这个规模的服务器可以使用其来开发。

但扩展太多管理可能有点麻烦,且还是存在第二部分阻塞的,所以想要支持更多的客户端还是得使用重叠IO和完成端口,这些下次来看。

以上是关于网络模型之Event select的主要内容,如果未能解决你的问题,请参考以下文章

31网络通信之Select模型

网络模型之select

IO模型之二-linux网络IO模式select,poll,epoll

js经常用到的代码片段

Python网络编程之高级篇三

Socket I/O模型之select模型