IOCP 的内存使用情况[关闭]

Posted

技术标签:

【中文标题】IOCP 的内存使用情况[关闭]【英文标题】:Memory usage with IOCP [closed] 【发布时间】:2017-06-28 18:23:47 【问题描述】:

我正在将我们的代码转换为使用 IOCP,并且我的通信相对稳定,但应用程序的内存使用量正在增加。看起来我回来(在完成函数调用时)的 OverlappedEx 对象比我创建的要少得多。我的代码如下。我做错了什么?

#ifndef NETWORK_DATA
#define NETWORK_DATA

#include <afxwin.h>
#include <vector>
#include <string>
#include "CriticalSectionLocker.h"

using namespace std;

DWORD NetworkManager::NetworkThread(void* param)

    bool bRun = true;


    while (bRun)
    
        DWORD wait = ::WaitForSingleObject(CCommunicationManager::s_hShutdownEvent, 0);
        if (WAIT_OBJECT_0 == wait)
        
            bRun = false;
            DEBUG_LOG0("Shutdown event was signalled thread");
        
        else
        
            DWORD dwBytesTransfered = 0;
            void* lpContext = nullptr;
            OVERLAPPED* pOverlapped = nullptr;

            BOOL bReturn = GetQueuedCompletionStatus(s_IOCompletionPort,
                                                    &dwBytesTransfered,
                                                    (LPDWORD)&lpContext,
                                                    &pOverlapped,
                                                    INFINITE);
            if (nullptr == lpContext)
            
                DEBUG_LOG0("invalid context");
                /*continue;*/
            
            else
            
                if (bReturn && dwBytesTransfered > 0)
                
                    OverlappedEx* data = reinterpret_cast<OverlappedEx*>(pOverlapped);
                    ServerData* networkData = reinterpret_cast<ServerData*>(lpContext);

                    if (networkData && data)
                                           
                        switch(data->m_opType)
                        
                        case OverlappedEx::OP_READ:
                                /*DEBUG_LOG4("device name: %s bytes received: %d socket: %d handle: %d", 
                                    networkData->Name().c_str(), dwBytesTransfered, networkData->Socket(), networkData->Handle());*/
                                networkData->CompleteReceive(dwBytesTransfered, data);                              
                                break;
                            case OverlappedEx::OP_WRITE:
                                /*DEBUG_LOG4("device name: %s bytes sent: %d socket: %d handle: %d",
                                    networkData->Name().c_str(), dwBytesTransfered, networkData->Socket(), networkData->Handle());*/
                                networkData->CompleteSend(dwBytesTransfered, data);
                                break;
                        
                    
                
                else
                
                    /*DEBUG_LOG2("GetQueuedCompletionStatus failed: bReturn: %d dwBytesTransferred: %u", bReturn, dwBytesTransfered);*/
                
            
        
    
    return 0;



enum NetworkType

    UDP,
    TCP
;

struct OverlappedEx : public OVERLAPPED

    enum OperationType
    
        OP_READ,
        OP_WRITE
    ;  

    const static int MAX_PACKET_SIZE = 2048;
    WSABUF m_wBuf;
    char m_buffer[MAX_PACKET_SIZE];
    OperationType m_opType; 

    OverlappedEx()
           
        Clear();
        m_refCount = 1;
    

    void AddRef()
    
        ::InterlockedIncrement(&m_refCount);
    

    void Release()
    
        ::InterlockedDecrement(&m_refCount);
    

    int Refcount() const
           
        return InterlockedExchangeAdd((unsigned long*)&m_refCount, 0UL);
    

    ~OverlappedEx()
           
        Clear();        
    

    void Clear()
    
        memset(m_buffer, 0, MAX_PACKET_SIZE);
        m_wBuf.buf = m_buffer;
        m_wBuf.len = MAX_PACKET_SIZE;
        Internal = 0;
        InternalHigh = 0;
        Offset = 0;
        OffsetHigh = 0;
        hEvent = nullptr;
        m_opType = OP_READ;
    

private:
    volatile LONG m_refCount;
;


class ServerData
       
public:
    const static int MAX_REVEIVE_QUEUE_SIZE = 100;
    const static int MAX_PACKET_SIZE = 2048;
    const static int MAX_SEND_QUEUE_SIZE = 10;
    const static int MAX_RECEIVE_QUEUE_SIZE = 100;  
    const static int MAX_OVERLAPPED_STRUCTS = 20;   


    ServerData(NetworkType netType, const string& sName, CCommunicationManager::CommHandle handle,
                            SOCKET sock, HANDLE IOPort) :   
        m_sName(sName)
    
        InitializeCriticalSection(&m_receiveQueLock);
        InitializeCriticalSection(&m_objectLock);
        m_Handle = handle;
        m_Socket = sock;
        m_nIPAddress = 0;           
        m_netType = netType;
        m_bEnabled = true;
        m_ovlpIndex = 0;

        for (int i = 0; i < MAX_OVERLAPPED_STRUCTS; ++i)
        
            m_olps.push_back(new OverlappedEx);
        

        /* Associate socket with completion handle */
        if (m_Socket != 0)
        
            CreateIoCompletionPort( reinterpret_cast<HANDLE>(m_Socket), IOPort, reinterpret_cast<ULONG_PTR>(this), 0 );
        
           

    ~ServerData()
               
        CriticalSectionLocker lock(&m_receiveQueLock);
        DeleteCriticalSection(&m_receiveQueLock);

        DeleteCriticalSection(&m_objectLock);
        closesocket(m_Socket);  
    

    const string& Name() const  return m_sName; 
    bool Enabled() const  return m_bEnabled; 
    void SetEnabled(bool bEnabled)
    
        m_bEnabled = bEnabled;
    

    int Handle() const  return m_Handle; 
    void SetHandle(int handle)
    
        m_Handle = handle;
    

    unsigned long IPAddress() const  return m_nIPAddress; 

    SOCKET Socket() const
    
        return m_Socket;
    

    void SetSocket(SOCKET sock)
    
        m_Socket = sock;
    

    void SetIPAddress(unsigned long nIP)
    
        m_nIPAddress = nIP;
           

    bool ValidTelegram(const vector<char>& telegram) const
    
        return false;
    

    OverlappedEx* GetBuffer()
    
        OverlappedEx* ret = nullptr;

        if (!m_olps.empty())
        
            ret = m_olps.front();
            m_olps.pop_front();
        

        return ret;
    


    void CompleteReceive(size_t numBytes, OverlappedEx* data)
           
        //DEBUG_LOG1("%d buffers are available", AvailableBufferCount());

        if (numBytes > 0)
           
            vector<char> v(data->m_buffer, data->m_buffer + numBytes);          
            ReceivedData rd;
            rd.SetData(v);
            EnqueReceiveMessage(rd);    
           

        data->Release();
        
            CriticalSectionLocker lock(&m_objectLock);
            m_olps.push_back(data);
//          DEBUG_LOG1("Queue size: %d", m_olps.size());
        

        StartReceiving();

    

    void CompleteSend(size_t numBytes, OverlappedEx* data)
    

        data->Release();    
        
            CriticalSectionLocker lock(&m_objectLock);
            m_olps.push_back(data);
            //DEBUG_LOG1("Queue size: %d", m_olps.size());
        

        //DEBUG_LOG2("Object: %s num sent: %d", Name().c_str(), numBytes);
    

    void StartReceiving()
    
        DWORD bytesRecv = 0;
        sockaddr_in senderAddr;
        DWORD flags = 0;
        int senderAddrSize = sizeof(senderAddr);
        int rc = 0;

        CriticalSectionLocker lock(&m_objectLock);
        auto olp = GetBuffer();
        if (!olp)
               
            if (...)
            
                m_olps.push_back(new OverlappedEx);
                olp = GetBuffer();
            
            else
            
                if (...)
                
                    DEBUG_LOG1("Name: %s ************* NO AVAILABLE BUFFERS - bailing ***************", Name().c_str());
                
                return;
            
        

        olp->Clear();
        olp->m_opType = OverlappedEx::OP_READ;
        olp->AddRef();

        switch(GetNetworkType())
        
        case UDP:
            
                rc = WSARecvFrom(Socket(),
                                &olp->m_wBuf,
                                1,
                                &bytesRecv,
                                &flags,
                                (SOCKADDR *)&senderAddr,
                                &senderAddrSize, (OVERLAPPED*)olp, NULL);
            
            break;
        case TCP:
            
                rc = WSARecv(Socket(), 
                            &olp->m_wBuf,
                            1, 
                            &bytesRecv, 
                            &flags, 
                            (OVERLAPPED*)olp, NULL);
            
            break;
        

        if (SOCKET_ERROR == rc)
        
            DWORD err = WSAGetLastError();
            if (err != WSA_IO_PENDING)
            
                olp->Release();
                m_olps.push_back(olp);
            
        
    


    void SetWriteBuf(const SendData& msg, OverlappedEx* data)
    
        int len = min(msg.Data().size(), MAX_PACKET_SIZE);
        memcpy(data->m_buffer, &msg.Data()[0], len);
        data->m_wBuf.buf = data->m_buffer;
        data->m_wBuf.len = len;
    

    void StartSending(const SendData& msg)
    

        DEBUG_LOG1("device name: %s", Name().c_str());

        int rc = 0;
        DWORD bytesSent = 0;
        DWORD flags = 0;    
        SOCKET sock = Socket();

        int addrSize = sizeof(sockaddr_in);     

        CriticalSectionLocker lock(&m_objectLock);
        //UpdateOverlapped(OverlappedEx::OP_WRITE);

        auto olp = GetBuffer();
        if (!olp)
        
            if (...)
            
                m_olps.push_back(new OverlappedEx);
                olp = GetBuffer();
                DEBUG_LOG2("name: %s ************* NO AVAILABLE BUFFERS new size: %d ***************", Name().c_str(), m_olps.size());
            
            else
            
                if (...)
                
                    DEBUG_LOG1("Name: %s ************* NO AVAILABLE BUFFERS - bailing ***************", Name().c_str());
                
                return;
            
        

        olp->Clear();
        olp->m_opType = OverlappedEx::OP_WRITE;
        olp->AddRef();
        SetWriteBuf(msg, olp);

        switch(GetNetworkType())
        
        case UDP:

            rc = WSASendTo(Socket(), &olp->m_wBuf, 1,
                            &bytesSent, flags, (sockaddr*)&msg.SendAddress(),
                            addrSize, (OVERLAPPED*)olp, NULL);
            break;
        case TCP:

            rc = WSASend(Socket(), &olp->m_wBuf, 1,
                        &bytesSent, flags, (OVERLAPPED*)olp, NULL);
            break;
        

        if (SOCKET_ERROR == rc)
        
            DWORD err = WSAGetLastError();
            if (err != WSA_IO_PENDING)
            
                olp->Release();
                m_olps.push_back(olp);
            
        
    

    size_t ReceiveQueueSize()
    
        CriticalSectionLocker lock(&m_receiveQueLock);
        return m_receiveDataQueue.size();
    

    void GetAllData(vector <ReceivedData> & data)
    
        CriticalSectionLocker lock(&m_receiveQueLock);
        while (m_receiveDataQueue.size() > 0)
        
            data.push_back(m_receiveDataQueue.front());
            m_receiveDataQueue.pop_front();
        
    

    void DequeReceiveMessage(ReceivedData& msg)
    
        CriticalSectionLocker lock(&m_receiveQueLock);
        if (m_receiveDataQueue.size() > 0)
        
            msg = m_receiveDataQueue.front();
            m_receiveDataQueue.pop_front();
        
    

    template <class T>
    void EnqueReceiveMessage(T&& data)
    
        CriticalSectionLocker lock(&m_receiveQueLock);
        if (m_receiveDataQueue.size() <= MAX_RECEIVE_QUEUE_SIZE)
        
            m_receiveDataQueue.push_back(data);
        
        else
        
            static int s_nLogCount = 0;
            if (s_nLogCount % 100 == 0)
            
                DEBUG_LOG2("Max queue size was reached handle id: %d in %s", Handle(), Name().c_str());
            
            s_nLogCount++;
        
               

    NetworkType GetNetworkType() const
    
        return m_netType;
       

private:        
    ServerData(const ServerData&);
    ServerData& operator=(const ServerData&);
private:
    bool m_bEnabled;                                                            //!< This member flags if this reciever is enabled for receiving incoming connections.
    int m_Handle;                                                   //!< This member holds the handle for this receiver.
    SOCKET m_Socket;                                                        //!< This member holds the socket information for this receiver.        
    unsigned long m_nIPAddress;                                             //!< This member holds an IP address the socket is bound to.
    deque < ReceivedData > m_receiveDataQueue;
    CRITICAL_SECTION m_receiveQueLock;  
    CRITICAL_SECTION m_objectLock;
    string m_sName; 
    NetworkType m_netType;
    deque<OverlappedEx*> m_olps;
    size_t m_ovlpIndex;
;


#endif

【问题讨论】:

您应该首先调试您的代码。通过检测您的代码或使用 [UMDH] (docs.microsoft.com/en-us/windows-hardware/drivers/debugger/…) 来定位泄漏。 Isalamon:我大致知道发生了什么,每次我要进行异步发送或接收时,我都会检查包含我的 OverlappedEx 对象的双端队列是否有任何东西。如果不是,我在堆上创建一个新对象。当完成例程返回/或发生错误时,我通过将其粘贴回该双端队列来重用该对象。 大量的代码仍然不是一个完整的minimal reproducible example。题外话:using namespace std; 放在标题中对人们来说是一件坏事。 @user4581301:它仍处于原型设计阶段。 我也可以建议你使用 rio (如果你只使用 win8+) - 这对 udp 有很大的好处 【参考方案1】:

您对void Release() 的实现毫无意义-您减少了m_refCount 等等?必须是

void Release()

    if (!InterlockedDecrement(&m_refCount)) delete this;

因此你永远不会释放 OverlappedEx* data - 这是我刚刚看到的,这会导致内存泄漏。

也可以建议 - 使用WaitForSingleObject(CCommunicationManager::s_hShutdownEvent, 0); 这是检测关机的坏主意。只调用GetQueuedCompletionStatus 和关机调用PostQueuedCompletionStatus(s_IOCompletionPort, 0, 0, 0) 几次(数量或线程在s_IOCompletionPort 上监听)如果线程视图pOverlapped==0 - 退出。

使用

OverlappedEx* data = static_cast<OverlappedEx*>(pOverlapped);

而不是reinterpret_cast

make ~OverlappedEx() private - 不能直接调用,只能通过Release调用

            olp->Release();
            m_olps.push_back(olp);

在对象上调用Release() 后,您不能在这里更多地访问它,所以或olp-&gt;Release()m_olps.push_back(olp); 但不能同时使用两者。这会杀死Release 的所有逻辑可能是您需要覆盖OverlappedExoperator delete 并在其中调用m_olps.push_back(olp);,当然也需要覆盖operator new

再次(OVERLAPPED*)olp - 在这里reinterpret_cast 是为了什么?因为你从OVERLAPPED 继承了自己的结构体,在这里编译器自动进行类型转换

【讨论】:

RbMm:非常感谢。每次我在调用完成例程时取回对象时,我都会将其放回双端队列中。这就是我重复使用它们的方式。 RbMm:为什么要静态转换? OverlappedEx 派生自 OVERLAPPED。 @MUXCAH - 正是因为这个 - 并且必须是 static_cast @MUXCAH - 但这里的dynamic_cast 是为了什么?!正是static_cast 是需要且正确的。在您当前的OVERLAPPEDOverlappedEx 实现地址中是相同的 - 所以reinterpret_caststatic_cast 给出相同的结果。但是,如果您更改 OverlappedEx 的定义(比如在 OVERLAPPED 或虚拟方法之前添加另一个基类),这将被破坏(reinterpret_cast)而 static_cast 将正确工作 @MUXCAH - 明确地说 - 在当前的二进制实现中 OverlappedEx* data = reinterpret_cast&lt;OverlappedEx*&gt;(pOverlapped); 是正确的。但从 c++ 规则 - 这里需要 static_cast (不是重新解释或动态)

以上是关于IOCP 的内存使用情况[关闭]的主要内容,如果未能解决你的问题,请参考以下文章

在cpp中测量函数内存使用情况[关闭]

如何使用 IOCP 发送文件?

IOCP 线程 - 澄清?

发送后如何正确关闭套接字(使用 IOCP)?

内存使用量急剧增加[关闭]

网络模型之IOCP服务器实例二