没有完成键的 IOCP 通知

Posted

技术标签:

【中文标题】没有完成键的 IOCP 通知【英文标题】:IOCP notification without completion key 【发布时间】:2015-06-08 04:31:13 【问题描述】:

我正在构建一个 IOCP/RIO Winsock 服务器,当我的 AcceptEx() 调用由客户端请求触发时,我一直在努力从完成端口获取正确的通知。

当我在发送客户端请求后调用 GetQueuedCompletionStatus() 时,我会成功返回,但没有包含任何数据的完成键。

我使用 PostQueuedCompletionStatus() 来获取完成键,但是当我的 AcceptEx() 调用发布通知时,我没有获得完成键,只是通知。

我确定我没有正确设置或调用某些东西,我只是找不到它在哪里。 here 的问题出奇的相似,但最终答案并没有解决我的问题。

为了将问题提炼成最简单的形式,我开始了一个新项目,只是复制了基本部分以复制问题。所有变量都是全局的,所以没有什么超出范围,为了发布代码,我删除了错误检查和任何循环。我们只需创建一个单独的套接字并给客户端几秒钟的时间来请求某些东西,然后我们检查是否发送了完成密钥。这就是这个错误证明项目的内容。

全局变量的 cmets 按照它们在代码中出现的顺序排列,并复制代码 cmets 以供参考。

//server main variables
const int maxSockets = 10;
const int bufferSlicesPerSocket = 3;
const int bufferSliceSize = 400;
const int numRecvToPostPerSocket = 2;
atomic<bool> runApp(true);
bool bResult;
int iResult;

//initWinsock
WSADATA wsaData;
struct addrinfo *ptr = NULL;
struct addrinfo hints;

//resolveAddress
struct addrinfo *result = NULL;

//createIOCP1
HANDLE hCompPort1;

//createListenSocket
SOCKET listenSocket = INVALID_SOCKET;

//associateListenSocketWithIOCP

//bindListenSocket

//startListenOnSocket

//getAcceptExPointer
LPFN_ACCEPTEX pAcceptEx = NULL;
GUID guidForAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;

//getRIO
RIO_EXTENSION_FUNCTION_TABLE rio;
GUID functionTableId = WSAID_MULTIPLE_RIO;

//registerBigBuffer
RIO_BUFFERID bigBufferId;
const int totalBufferSlices = maxSockets * bufferSlicesPerSocket;
const int bigBufferSize = totalBufferSlices * bufferSliceSize;
char bigBuffer[bigBufferSize];
char * pBigBuffer = bigBuffer;

//fillBufferSliceQueue
RIO_BUF bufferSliceArray[totalBufferSlices];
concurrent_queue<int> availableBufferSliceIdQueue;

//createCompletionQueue
RIO_CQ recvCQ;
RIO_CQ sendCQ;
const int RQMaxRecvs = 5;
const int RQMaxSends = 5;
DWORD recvCQsize = maxSockets * RQMaxRecvs;
DWORD sendCQsize = maxSockets * RQMaxSends;
RIO_NOTIFICATION_COMPLETION notify;
WSAOVERLAPPED ol;

//fill empty socket queue with all Ids
concurrent_queue<int> availableEmptySocketIdQueue;

//set per-handle data
ULONG_PTR cKeyArray[maxSockets];

//check for an empty socketId and create a socket
int socketId;

//create a socket
SOCKET socketArray[maxSockets] =  INVALID_SOCKET ;

//then post an accept on the socket

//associate the socket with the completion port
HANDLE hCompPort2;

//empty overlapped structure
WSAOVERLAPPED olArray[maxSockets];

//post an acceptEx on socket
const int acceptExAddrBufferLength = (sizeof(sockaddr) + 16) * 2 * maxSockets;
char acceptExAddrBuffer[acceptExAddrBufferLength];

//then create a request queue on the socket
RIO_RQ requestQueueArray[maxSockets];

//then post a receive on the socket

//get next available buffer slice id
int bufferSliceId;
int postRecv_counter;

//post a recv

//zero memory for overlapped struct
LPOVERLAPPED pol;

//check for completion
ULONG_PTR pKey;

//record socket as being in use
bool socketInUse[maxSockets];
atomic<int> numSocketsInUse = 0;

//update new socket with the listening socket's options
int numResults;
const int maxDequeue = 10;
RIORESULT rioResultArray[maxDequeue];

//calculate contexts
int socketContext;
int requestContext;

这里是main()和初始化:

int _tmain(int argc, _TCHAR* argv[]) 
    //initWinsock
    iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);

    ZeroMemory(&hints, sizeof(hints));

    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    //resolve local address and port to be used by the server
    iResult = getaddrinfo(NULL, DEFAULT_PORT, &hints, &result);

    //create a handle for the first completion port
    hCompPort1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (u_long)0, 0);

    //create a socket for the server to listen to for client connections
    listenSocket = ::WSASocket(
        AF_INET,
        SOCK_STREAM,
        IPPROTO_TCP,
        NULL,
        0,
        WSA_FLAG_REGISTERED_IO | WSA_FLAG_OVERLAPPED);

    //associate the listening socket with the first completion port
    CreateIoCompletionPort((HANDLE)listenSocket, hCompPort1, (u_long)0, 0);

    //bind the listening socket
    iResult = ::bind(listenSocket, result->ai_addr, (int)result->ai_addrlen);

    //start listen on socket
    iResult = listen(listenSocket, SOMAXCONN);

    //get the AcceptEx pointer
    iResult = WSAIoctl(
        listenSocket,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        &guidForAcceptEx,
        sizeof(guidForAcceptEx),
        &pAcceptEx,
        sizeof(pAcceptEx),
        &dwBytes,
        NULL,
        NULL);

    //get RIO
    if (0 != WSAIoctl(
        listenSocket,
        SIO_GET_MULTIPLE_EXTENSION_FUNCTION_POINTER,
        &functionTableId,
        sizeof(GUID),
        (void**)&rio,
        sizeof(rio),
        &dwBytes,
        0,
        0
        )) 

    //register big buffer
    bigBufferId = rio.RIORegisterBuffer(pBigBuffer, bigBufferSize);

    //fill bufferSlice queue
    for (int i = 0; i < totalBufferSlices; i++) 
        bufferSliceArray[i].BufferId = bigBufferId;
        bufferSliceArray[i].Offset = i * bufferSliceSize;
        bufferSliceArray[i].Length = bufferSliceSize;
        availableBufferSliceIdQueue.push(i);
    

    //createCompletionQueue
    notify.Type = RIO_IOCP_COMPLETION;
    notify.Iocp.IocpHandle = hCompPort1;
    notify.Iocp.Overlapped = &ol;
    notify.Iocp.CompletionKey = NULL;

    //create completion queue
    recvCQ = rio.RIOCreateCompletionQueue(
        recvCQsize, //size of queue
        &notify);   //notification mechanism
    sendCQ = rio.RIOCreateCompletionQueue(
        sendCQsize, //size of queue
        &notify);   //notification mechanism

    socketId = 1; //simplified for demo

    //create a socket
    socketArray[socketId] = ::WSASocket(
        AF_INET,                                        //af
        SOCK_STREAM,                                    //type
        IPPROTO_TCP,                                    //protocol
        NULL,                                           //lpProtocolInfo
        0,                                              //group
        WSA_FLAG_REGISTERED_IO | WSA_FLAG_OVERLAPPED);  //dwFlags

    //set completion key
    cKeyArray[socketId] = socketId;

    //then post an accept on the socket

    //empty overlapped structure
    memset(&olArray[socketId], 0, sizeof(WSAOVERLAPPED));

我们现在开始将套接字与完成端口相关联:

//associate the socket with the completion port
hCompPort2 = CreateIoCompletionPort(
    (HANDLE)socketArray[socketId],  //file handle
    hCompPort1,                     //completion port
    cKeyArray[socketId],            //completion key
    NULL);                          //number of threads

//post an acceptEx on socket
bResult = pAcceptEx(
    listenSocket,                                                   //listen socket
    socketArray[socketId],                                          //accept socket
    &acceptExAddrBuffer[socketId * (sizeof(sockaddr) + 16) * 2],    //output buffer
    0,                                                              //data length
    sizeof(sockaddr) + 16,                                          //local address length
    sizeof(sockaddr) + 16,                                          //remote address length
    &dwBytes,                                                       //bytes received
    &olArray[socketId]);                                            //overlapped struct

//then create a request queue on the socket
requestQueueArray[socketId] = rio.RIOCreateRequestQueue(
    socketArray[socketId],  //socket
    RQMaxRecvs,             //max RECVs on queue
    1,                      //max recv buffers, set to 1
    RQMaxSends,             //max outstanding sends
    1,                      //max send buffers, set to 1
    recvCQ,                 //recv completion queue
    sendCQ,                 //send completion queue
    &socketArray[socketId]);//socket context

//then post a receive on the socket

//get next available buffer slice id
bufferSliceId = 1; //simplified for demo

//post a recv
bResult = rio.RIOReceive(
    requestQueueArray[socketId],        //socket request queue
    &bufferSliceArray[bufferSliceId],   //buffer slice
    1,                                  //set to 1
    0,                                  //flags
    &bufferSliceArray[bufferSliceId]);  //request context

wprintf(L"Please send request now...\n");

std::this_thread::sleep_for(std::chrono::milliseconds(3500));

最后,将通知出队,由于某种原因不包括完成键。

//send a dummy message to check the IOCP
//bResult = PostQueuedCompletionStatus(hCompPort2,(DWORD)0,(ULONG_PTR)socketId,NULL);   

//zero memory for overlapped struct
ZeroMemory(&pol, sizeof(LPOVERLAPPED));

//check for completion
iResult = GetQueuedCompletionStatus(
    hCompPort2, //completion port
    &dwBytes,   //number of bytes transferred
    &pKey,      //completion key pointer
    &pol,       //overlapped pointer
    0);         //milliseconds to wait

//update new socket with the listening socket's options
iResult = setsockopt(
    socketArray[pKey],          //socket
    SOL_SOCKET,                 //level
    SO_UPDATE_ACCEPT_CONTEXT,   //optname
    (char*)&listenSocket,       //*optval
    sizeof(listenSocket));      //optlen

wprintf(L"accepted socketId: %d\n", pKey);

return 0;

完成键没有数据,但是当我手动发送 PostQueuedCompletionStatus() 时,我会得到带有通知的键。我在我的 AcceptEx()CreateIoCompletionPort() 中没有做什么???

【问题讨论】:

建立连接时,pKey 返回什么值? (也许它正在返回与listenSocket 关联的密钥,而不是与socketArray[1] 关联的密钥?) 0。这就是让我吃惊的地方。除非我使用 PostQueuedCompletionStatus() 手动将一个值放在那里,否则我从未从该键中获得值。我考虑过它是否是 AcceptEx() 问题、socket-association-with-IOCP 问题、套接字创建问题、监听套接字问题等。我真的很难过。我检查过它实际上不是listenSocket id。键的值一直是 0x00000000。 【参考方案1】:

您将 0 指定为侦听套接字和 RIO 完成队列的完成键。您正在为AcceptEx() 在接受客户端时填充的客户端套接字分配其他完成键。 GetQueuedCompletionStatus() 报告正在执行排队 IOCP 操作的套接字的完成键。

AcceptEx() 完成时,GetQueuedCompletionStatus() 报告监听套接字的完成键(即 0)。

RIOReceive() 完成时,GetQueuedCompletionStatus() 报告从中读取的客户端套接字/队列的完成键。

因此您需要为每种类型的套接字(侦听与客户端)分配唯一的完成键,并且仅在收到侦听套接字的完成键时才调用setsockopt(SO_UPDATE_ACCEPT_CONTEXT)。如果您不以这种方式使用完成键,则必须使用 OVERLAPPED 结构来传递您自己的自定义每个套接字上下文信息,因此您仍然可以将 AcceptEx() 操作与其他操作区分开来。

【讨论】:

太棒了,谢谢雷米。所以要明确一点,当AcceptEx()调用完成时,它只显示监听套接字的键,所以识别新接受的套接字的唯一方法是通过OVERLAPPED条目? @Michael220:当AcceptEx() 完成时,GetQueuedCompletionStatus() 将提供监听套接字的完成键,以及您传递给AcceptEx()OVERLAPPED。因此,如果要识别接受的套接字,则必须扩展 OVERLAPPED 以包含客户端 SOCKET,或至少包含其 socketid

以上是关于没有完成键的 IOCP 通知的主要内容,如果未能解决你的问题,请参考以下文章

Windows完成端口 IOCP模型

IOCP 线程 - 澄清?

有啥方法可以使用 IOCP 来通知套接字何时可读/可写?

谁能给我解释一下这个 IOCP 图?

技术派-epoll和IOCP之比较

IOCP模型