I/O多路复用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了I/O多路复用相关的知识,希望对你有一定的参考价值。

//1.select():
//  第一个参数:文件描述符的最大值加1,在windows下被忽略。
//  第二个参数:fd_set的结构体指针,包含可读性文件描述符
//             其包含的SOCKET在满足如下条件时候就被设置为就绪状态:
//               A:如果调用了listen()并且有新连接加入的时候,则调用accept会成功。
//               B:有数据可读的时候
//               C:连接断开的时候。(利用这个特性可以方便的实现客户端断线重连)
//  第三个参数:fd_set的结构体指针,包含可写性文件描述符
//             其包含的SOCKET在满足如下条件时候就被设置为就绪状态:
//               A:连接成功的时候。
//               B:有数据可以发送的时候。
//  第四个参数:fd_set的结构体指针,包含发送错误的文件描述符
//             其包含的SOCKET在满足如下条件时候就被设置为就绪状态:
//               A:调用connect()函数,但是连接失败的时候
//               B:有带外数据可以读取
//  第五个参数:select()函数的等待时间,若为0则阻塞(除非select()中的某个文件描述符发生变化才返回)
//  此函数返回所有fd_set结构体中就绪的文件描述符数。如果超时返回0,如果发生错误返回SOCKET_ERROR(一种情况是:当select()的三个fd_set结构体都为空的时候,就会返回SOCKET_ERROR)
//  由于select()会改变描述集合,所以每次都要重新FD_ZERO和FD_SET

//2.FD_CLR:从集合中删除指定SOCKET
//  FD_ISSET:判断指定SOCKET是否在集合中
//  FD_SET:将指定SOCKET加入集合
//  FD_ZERO:将集合清空

//3.fd_set结构体
typedef struct fd_set {
    u_int fd_count;                 /* how many are SET? */
    SOCKET  fd_array[FD_SETSIZE];   /* an array of SOCKETs */
} fd_set;
#define FD_SETSIZE      64
//  由fd_set结构体得出,fd_set每次最多可以操作64个SOCKET,若想对128(举个例子)个SOCKET进行操作可以在第一次轮询时,将前64个SOCKET放入队列中用Select进行状态查询, 
//  待本次操作全部结束后.将后64个SOCKET再加入轮询队列中进行轮询处理.这样处理需要在非阻塞式下工作.以此类推,Select也能支持无限多个。

1.服务器代码

#include <WinSock2.h>
#include <vector>
using std::vector;

#pragma comment(lib, "ws2_32.lib")

int _tmain(int argc, _TCHAR* argv[])
{
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)                             {WSACleanup(); return false;}
    if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)        {WSACleanup(); return false;}

    SOCKET SockListen = socket(AF_INET, SOCK_STREAM, 0);
    if (INVALID_SOCKET == SockListen)    {WSACleanup(); return false;}

    SOCKADDR_IN sockAddr;
    sockAddr.sin_family = AF_INET;
    sockAddr.sin_port = htons(8000);
    sockAddr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");

    if (SOCKET_ERROR == bind(SockListen, (SOCKADDR*)&sockAddr, sizeof SOCKADDR))
    {
        closesocket(SockListen);
        WSACleanup();
        return false;
    }
    if (SOCKET_ERROR == listen(SockListen, 5))
    {
        closesocket(SockListen);
        WSACleanup();
        return false;
    }

    vector<SOCKET> vecSockClient;

    while (1)
    {
        fd_set setAccept = {};
        FD_ZERO(&setAccept);

        FD_SET(SockListen, &setAccept);

        timeval timeVal;
        timeVal.tv_sec = 1;
        timeVal.tv_usec = 0;

        int selValue = select(0, &setAccept, nullptr, nullptr, &timeVal);
        if (SOCKET_ERROR == selValue)
        {
            closesocket(SockListen);
            for (int i = 0; i < vecSockClient.size(); ++i)    {closesocket(vecSockClient[i]);}
            WSACleanup();
            return false;
        }
        else if (selValue > 0)
        {
            SOCKADDR_IN temSockAddr = {};
            int temSize = sizeof SOCKADDR;
            SOCKET temSock = accept(SockListen, (SOCKADDR*)&temSockAddr, &temSize);
            if (SOCKET_ERROR == temSock)    {continue;}
            const char *temStr = "Welcome to connect!";
            send(temSock, temStr, strlen(temStr), 0);
            printf("第%d个连接建立\n", vecSockClient.size());
            vecSockClient.push_back(temSock);
        }

        fd_set SetClientRead = {};
        FD_ZERO(&SetClientRead);

        for (int i = 0; i < vecSockClient.size(); ++i)    {FD_SET(vecSockClient[i], &SetClientRead);}
    
        if (vecSockClient.empty())    {Sleep(1);continue;}

        selValue = select(0, &SetClientRead, nullptr, nullptr, &timeVal);
        if (SOCKET_ERROR == selValue)
        {
            closesocket(SockListen);
            for (int i = 0; i < vecSockClient.size(); ++i)    {closesocket(vecSockClient[i]);}
            WSACleanup();
            return false;
        }
        else if (0 == selValue)    {Sleep(1); continue;}
        else
        {
            int vecSize = vecSockClient.size();
            for (int i = 0, j = 0; i < vecSize; ++i, ++j)
            {
                if (FD_ISSET(vecSockClient[j], &SetClientRead))
                {
                    char revBuff[10] = {0};
                    int recvLength = recv(vecSockClient[j], revBuff, 9, 0);
                    if (recvLength <= 0)
                    {
                        FD_CLR(vecSockClient[j], &SetClientRead);
                        vecSockClient.erase(vecSockClient.begin() + j, vecSockClient.begin() + j + 1);
                        printf("第%d个连接断开\n", i);
                        --j;
                    }
                    else     {printf("第%d个连接,发送数据为:%s\n", j, revBuff);}
                }
            }
        }
    }
    return 0;
}

2.客户端代码

#include <WinSock2.h>

#pragma comment(lib, "ws2_32.lib")

int _tmain(int argc, _TCHAR* argv[])
{
    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)    {return false;}

    if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)    {WSACleanup(); return false;}

    SOCKET sockClient = socket(AF_INET, SOCK_STREAM, 0);                
    if (INVALID_SOCKET == sockClient)                                      {WSACleanup(); return false;}

    SOCKADDR_IN sockAddr;
    sockAddr.sin_family = AF_INET;
    sockAddr.sin_port = htons(8000);
    sockAddr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");

    while (1)
    {
        fd_set setError = {};
        FD_ZERO(&setError);
        FD_SET(sockClient, &setError);

        connect(sockClient, (SOCKADDR*)&sockAddr, sizeof SOCKADDR);

        timeval timeVal = {};
        timeVal.tv_sec = 1;

        int selValue = select(0, nullptr, nullptr, &setError, &timeVal);
        if (0 == selValue)    {break;}
    }

    while (1)
    {
        fd_set setRecv = {};
        FD_ZERO(&setRecv);
        FD_SET(sockClient, &setRecv);

        timeval timVal = {};
        timVal.tv_sec = 2;
        timVal.tv_usec = 0;
        int selValue = select(0, &setRecv, nullptr, nullptr, &timVal);
        if (SOCKET_ERROR == selValue)
        {
            closesocket(sockClient);
            sockClient = INVALID_SOCKET;
            WSACleanup();
            return false;
        }
        else if (selValue > 0)
        {
            char recvBuff[1024] = {0};
            int recvLength = recv(sockClient, recvBuff, 1023, 0);
            if (recvLength <= 0)
            {
                FD_CLR(sockClient, &setRecv);
                closesocket(sockClient);
                WSACleanup();
                return false;
            }
            if (FD_ISSET(sockClient, &setRecv))
            {
                send(sockClient, "123", 3, 0);
                
                closesocket(sockClient);
                sockClient = INVALID_SOCKET;
                WSACleanup();
                return true;
            }
        }
    }

    system("pause");
    return 0;
}

 

以上是关于I/O多路复用的主要内容,如果未能解决你的问题,请参考以下文章

Redis 和 I/O 多路复用

什么是Redis I/O 多路复用?

I/O多路复用——epoll函数

I/O多路复用-EPOLL探索

I/O多路复用

第15章 高并发服务器编程_I/O多路复用