IO 完成端口初始读取和双向数据

Posted

技术标签:

【中文标题】IO 完成端口初始读取和双向数据【英文标题】:IO Completion Port Initial Read and Bi-Directional Data 【发布时间】:2016-06-07 18:27:02 【问题描述】:

我有以下简化的 IO 完成端口服务器 C++ 代码:

int main(..)

    startCompletionPortThreadProc();

    // Await client connection

    sockaddr_in clientAddress;
    int clientAddressSize = sizeof( clientAddress );
    SOCKET acceptSocket = WSAAccept( serverSocket, (SOCKADDR*)&clientAddress, &clientAddressSize, NULL, NULL);  

    // Connected

    CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, 0, 0 );

    // Issue initial read
    read( acceptSocket );



DWORD WINAPI completionPortThreadProc( LPVOID param )

    DWORD bytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    LPPER_IO_DATA perIoData = NULL;

    while( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
    
        if( WaitForSingleObject( exitEvent, 0 ) == WAIT_OBJECT_0 )
        
            break;
        

        if( !perIoData )
            continue;

        if( bytesTransferred == 0 )
        
            //TODO
        

        switch( perIoData->operation )
        
            case OPERATION_READ:
            
                // Bytes have been received

                if( bytesTransferred < perIoData->WSABuf.len )
                
                    // Terminate string
                    perIoData->WSABuf.buf[bytesTransferred]   = '\0';
                    perIoData->WSABuf.buf[bytesTransferred+1] = '\0';
                

                // Add data to message build
                message += std::tstring( (TCHAR*)perIoData->WSABuf.buf );

                // Perform next read
                    perIoData->WSABuf.len = sizeof( perIoData->inOutBuffer );
                    perIoData->flags = 0; 

                    if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == 0 )
                    
                        // Part message
                        continue;
                    

                    if( WSAGetLastError() == WSA_IO_PENDING )
                    
                        // End of message
//TODO: Process message here
                        continue;
                    
                
            
            break;

            case OPERATION_WRITE:
            
                perIoData->bytesSent += bytesTransferred;

                if( perIoData->bytesSent < perIoData->bytesToSend )
                
                    perIoData->WSABuf.buf = (char*)&( perIoData->inOutBuffer[perIoData->bytesSent] );
                    perIoData->WSABuf.len = ( perIoData->bytesToSend - perIoData->bytesSent);
                
                else
                
                    perIoData->WSABuf.buf  = (char*)perIoData->inOutBuffer;
                    perIoData->WSABuf.len  = _tcslen( perIoData->inOutBuffer ) * sizeof( TCHAR );
                    perIoData->bytesSent   = 0;
                    perIoData->bytesToSend = perIoData->WSABuf.len;
                

                if( perIoData->bytesToSend )
                
                    if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesTransferred, 0, &( perIoData->overlapped ), NULL ) == 0 )
                        continue;

                    if( WSAGetLastError() == WSA_IO_PENDING )
                        continue;
                
            
            break;
        
    

    return 0;


bool SocketServer::read( SOCKET socket, HANDLE completionPort )

    PER_IO_DATA* perIoData = new PER_IO_DATA;
    ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );

    perIoData->socket            = socket;
    perIoData->operation         = OPERATION_READ;
    perIoData->WSABuf.buf        = (char*)perIoData->inOutBuffer; 
    perIoData->WSABuf.len        = sizeof( perIoData->inOutBuffer );
    perIoData->overlapped.hEvent = WSACreateEvent();

    DWORD bytesReceived = 0;
    if( WSARecv( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesReceived, &( perIoData->flags ), &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
    
        int gle = WSAGetLastError();
        if( WSAGetLastError() != WSA_IO_PENDING )
        
            delete perIoData;
            return false;
        
    

    return true;


bool SocketServer::write( SOCKET socket, std::tstring& data )

    PER_IO_DATA* perIoData = new PER_IO_DATA;
    ZeroMemory( perIoData, sizeof( PER_IO_DATA ) );

    perIoData->socket            = socket; 
    perIoData->operation         = OPERATION_WRITE;
    perIoData->WSABuf.buf        = (char*)data.c_str();
    perIoData->WSABuf.len        = _tcslen( data.c_str() ) * sizeof( TCHAR );
    perIoData->bytesToSend       = perIoData->WSABuf.len;  
    perIoData->overlapped.hEvent = WSACreateEvent();

    DWORD bytesSent = 0;
    if( WSASend( perIoData->socket, &( perIoData->WSABuf ), 1, &bytesSent, 0, &( perIoData->overlapped ), NULL ) == SOCKET_ERROR )
    
        if( WSAGetLastError() != WSA_IO_PENDING )
        
            delete perIoData;
            return false;
        
    

    return true;

1) 我遇到的第一个问题是初读。

在客户端连接(接受)时,我发出读取。由于客户端还没有发送任何数据,WSAGetLastError() 为 WSA_IO_PENDING 并且 read 方法返回。

当客户端随后发送数据时,线程仍停留在 GetQueuedCompletionStatus 调用中(因为我假设我需要另一个 WSARecv 调用?)。

我是否应该一直循环读取方法直到数据到达?这似乎不合逻辑,我认为通过发出初始读取 GetQueuedCompletionStatus 会在数据到达时完成。

2) 我需要在没有确认的情况下双向读写数据。因此,我还使用 IOCP 线程创建了一个客户端。是否真的可以使用完成端口来执行此操作,还是必须在读取之后进行写入?

对不起,感觉像是基本问题,但是在搜索互联网并构建 IOCP 示例之后,我仍然无法回答这些问题。

提前非常感谢。

【问题讨论】:

【参考方案1】:

在客户端连接(接受)时,我发出读取。由于客户端还没有发送任何数据,WSAGetLastError() 为 WSA_IO_PENDING 并且 read 方法返回。

这是正常行为。

当客户端随后发送数据时,线程仍停留在 GetQueuedCompletionStatus 调用中(因为我假设我需要另一个 WSARecv 调用?)。

不,您不需要另一个电话。如果它卡住了,那么你没有正确地将读取与 I/O 完成端口相关联。

我是否应该一直循环读取方法直到数据到达?

没有。您需要致电WSARecv() 一次以进行初次阅读。 WSA_IO_PENDING 错误意味着读取正在等待数据,并会在数据实际到达时向 I/O 完成端口发出信号。在该信号实际到达之前,请勿调用WSARecv()(或任何其他读取函数)。然后您可以再次拨打WSARecv() 等待更多数据。重复直到套接字断开。

我认为通过发出初始读取 GetQueuedCompletionStatus 会在数据到达时完成。

这正是应该发生的事情。

2) 我需要在没有确认的情况下双向读写数据。因此,我还使用 IOCP 线程创建了一个客户端。是否真的可以使用完成端口来做到这一点

是的。读取和写入是独立的操作,它们不相互依赖。

读后一定要写吗?

如果您的协议不需要它,则不需要。

现在,话虽如此,您的代码存在一些问题。

顺便说一句,WSAAccept() 是同步的,您应该考虑改用AcceptEx(),以便它可以使用相同的 I/O 完成端口来报告新连接。

但更重要的是,当挂起的 I/O 操作失败时,GetQueuedCompletionStatus() 返回 FALSE,返回的 LPOVERLAPPED 指针将非 NULL,GetLastError() 将报告 I/O 操作失败的原因/O 操作失败。但是,如果GetQueuedCompletionStatus()本身失败,返回的LPOVERLAPPED指针将为NULL,GetLastError()将报告GetQueuedCompletionStatus()失败的原因。 documentation 中清楚地解释了这种差异,但您的 while 循环没有考虑到它。请改用do..while 循环并根据LPOVERLAPPED 指针操作:

DWORD WINAPI completionPortThreadProc( LPVOID param )

    DWORD bytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    LPPER_IO_DATA perIoData = NULL;

    do
    
        if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
        
            // I/O success, handle perIoData based on completionKey as needed...
        
        else if( perIoData )
        
            // I/O failed, handle perIoData based on completionKey as needed...
        
        else
        
            // GetQueuedCompletionStatus() failure...
            break;
            
    
    while( WaitForSingleObject( exitEvent, 0 ) == WAIT_TIMEOUT );

    return 0;

附带说明,不要使用事件对象来发出completionPortThreadProc() 应该退出的信号,而是考虑使用PostQueuedCompletionionStatus() 将终止completionKey 发布到I/O 完成端口,然后您的循环可以查找该值:

DWORD WINAPI completionPortThreadProc( LPVOID param )

    DWORD bytesTransferred = 0;
    ULONG_PTR completionKey = NULL;
    LPPER_IO_DATA perIoData = NULL;

    do
    
        if( GetQueuedCompletionStatus( completionPort, &bytesTransferred, &completionKey, (LPOVERLAPPED*)&perIoData, INFINITE ) )
        
            if( completionKey == MyTerminateKey )
                break;

            if( completionKey == MySocketIOKey )
            
                // I/O success, handle perIoData as needed...
            
        
        else if( perIoData )
        
            // I/O failed, handle perIoData based on completionKey as needed...
        
        else
        
            // GetQueuedCompletionStatus() failure...
            break;
            
    
    while( true );

    return 0;

CreateIoCompletionPort( (HANDLE)acceptSocket, completionPort, MySocketIOKey, 0 );

PostQueuedCompletionStatus( completionPort, 0, MyTerminateKey, NULL );

【讨论】:

嗨 Remy,非常感谢您长时间的解释,非常感谢,我在这里发疯了。我将通过您的 cmets 工作并报告!我从我认为有效的其他示例中提取了大量代码。 嗨 Remy,好的,新循环是关键,专门检查“if(perIoData)”中的 GetLastError。 I/O 因 WSA_OPERATION_ABORTED 而失败,因此它从未完成。我有一个接受线程发出读取然后结束。我原以为它仍然可以工作,但显然我的设计是错误的。非常感谢您的帮助,我将在我的新设计中使用您的想法。希望您的 cmets 也能帮助其他人,因为很多示例似乎都使用了像我这样的代码。 当一个线程终止时,它已启动但仍处于挂起状态的任何 I/O 操作都会自动中止。您应该在一个线程内的循环中调用WSAAccept(),该线程至少在侦听套接字的生命周期内(或将客户端接受移动到 IO 完成端口并从此类线程发出初始接受),因此 I/O 不会'不要中止.. Remy,线程终止和未解决的 I/O 问题从 Windows Vista 开始不再是问题...通过 C++ (Richter) 查看 Windows (lenholgate.com/blog/2008/02/…)

以上是关于IO 完成端口初始读取和双向数据的主要内容,如果未能解决你的问题,请参考以下文章

完成端口

使用套接字时,哪些 IO 操作会导致完成数据包发送到完成端口?

IO 完成端口:WSARecv() 是如何工作的?

使用 io 完成端口时,WriteFile 是不是会在完成时立即发布完成数据包

stm32的双向io口

IO 完成端口和重叠管理