IOCP 的内存使用情况[关闭]
Posted
技术标签:
【中文标题】IOCP 的内存使用情况[关闭]【英文标题】:Memory usage with IOCP [closed] 【发布时间】:2017-06-28 18:23:47 【问题描述】:我正在将我们的代码转换为使用 IOCP,并且我的通信相对稳定,但应用程序的内存使用量正在增加。看起来我回来(在完成函数调用时)的 OverlappedEx 对象比我创建的要少得多。我的代码如下。我做错了什么?
#ifndef NETWORK_DATA
#define NETWORK_DATA
#include <afxwin.h>
#include <vector>
#include <string>
#include "CriticalSectionLocker.h"
using namespace std;
DWORD NetworkManager::NetworkThread(void* param)
bool bRun = true;
while (bRun)
DWORD wait = ::WaitForSingleObject(CCommunicationManager::s_hShutdownEvent, 0);
if (WAIT_OBJECT_0 == wait)
bRun = false;
DEBUG_LOG0("Shutdown event was signalled thread");
else
DWORD dwBytesTransfered = 0;
void* lpContext = nullptr;
OVERLAPPED* pOverlapped = nullptr;
BOOL bReturn = GetQueuedCompletionStatus(s_IOCompletionPort,
&dwBytesTransfered,
(LPDWORD)&lpContext,
&pOverlapped,
INFINITE);
if (nullptr == lpContext)
DEBUG_LOG0("invalid context");
/*continue;*/
else
if (bReturn && dwBytesTransfered > 0)
OverlappedEx* data = reinterpret_cast<OverlappedEx*>(pOverlapped);
ServerData* networkData = reinterpret_cast<ServerData*>(lpContext);
if (networkData && data)
switch(data->m_opType)
case OverlappedEx::OP_READ:
/*DEBUG_LOG4("device name: %s bytes received: %d socket: %d handle: %d",
networkData->Name().c_str(), dwBytesTransfered, networkData->Socket(), networkData->Handle());*/
networkData->CompleteReceive(dwBytesTransfered, data);
break;
case OverlappedEx::OP_WRITE:
/*DEBUG_LOG4("device name: %s bytes sent: %d socket: %d handle: %d",
networkData->Name().c_str(), dwBytesTransfered, networkData->Socket(), networkData->Handle());*/
networkData->CompleteSend(dwBytesTransfered, data);
break;
else
/*DEBUG_LOG2("GetQueuedCompletionStatus failed: bReturn: %d dwBytesTransferred: %u", bReturn, dwBytesTransfered);*/
return 0;
enum NetworkType
UDP,
TCP
;
struct OverlappedEx : public OVERLAPPED
enum OperationType
OP_READ,
OP_WRITE
;
const static int MAX_PACKET_SIZE = 2048;
WSABUF m_wBuf;
char m_buffer[MAX_PACKET_SIZE];
OperationType m_opType;
OverlappedEx()
Clear();
m_refCount = 1;
void AddRef()
::InterlockedIncrement(&m_refCount);
void Release()
::InterlockedDecrement(&m_refCount);
int Refcount() const
return InterlockedExchangeAdd((unsigned long*)&m_refCount, 0UL);
~OverlappedEx()
Clear();
void Clear()
memset(m_buffer, 0, MAX_PACKET_SIZE);
m_wBuf.buf = m_buffer;
m_wBuf.len = MAX_PACKET_SIZE;
Internal = 0;
InternalHigh = 0;
Offset = 0;
OffsetHigh = 0;
hEvent = nullptr;
m_opType = OP_READ;
private:
volatile LONG m_refCount;
;
class ServerData
public:
const static int MAX_REVEIVE_QUEUE_SIZE = 100;
const static int MAX_PACKET_SIZE = 2048;
const static int MAX_SEND_QUEUE_SIZE = 10;
const static int MAX_RECEIVE_QUEUE_SIZE = 100;
const static int MAX_OVERLAPPED_STRUCTS = 20;
ServerData(NetworkType netType, const string& sName, CCommunicationManager::CommHandle handle,
SOCKET sock, HANDLE IOPort) :
m_sName(sName)
InitializeCriticalSection(&m_receiveQueLock);
InitializeCriticalSection(&m_objectLock);
m_Handle = handle;
m_Socket = sock;
m_nIPAddress = 0;
m_netType = netType;
m_bEnabled = true;
m_ovlpIndex = 0;
for (int i = 0; i < MAX_OVERLAPPED_STRUCTS; ++i)
m_olps.push_back(new OverlappedEx);
/* Associate socket with completion handle */
if (m_Socket != 0)
CreateIoCompletionPort( reinterpret_cast<HANDLE>(m_Socket), IOPort, reinterpret_cast<ULONG_PTR>(this), 0 );
~ServerData()
CriticalSectionLocker lock(&m_receiveQueLock);
DeleteCriticalSection(&m_receiveQueLock);
DeleteCriticalSection(&m_objectLock);
closesocket(m_Socket);
const string& Name() const return m_sName;
bool Enabled() const return m_bEnabled;
void SetEnabled(bool bEnabled)
m_bEnabled = bEnabled;
int Handle() const return m_Handle;
void SetHandle(int handle)
m_Handle = handle;
unsigned long IPAddress() const return m_nIPAddress;
SOCKET Socket() const
return m_Socket;
void SetSocket(SOCKET sock)
m_Socket = sock;
void SetIPAddress(unsigned long nIP)
m_nIPAddress = nIP;
bool ValidTelegram(const vector<char>& telegram) const
return false;
OverlappedEx* GetBuffer()
OverlappedEx* ret = nullptr;
if (!m_olps.empty())
ret = m_olps.front();
m_olps.pop_front();
return ret;
void CompleteReceive(size_t numBytes, OverlappedEx* data)
//DEBUG_LOG1("%d buffers are available", AvailableBufferCount());
if (numBytes > 0)
vector<char> v(data->m_buffer, data->m_buffer + numBytes);
ReceivedData rd;
rd.SetData(v);
EnqueReceiveMessage(rd);
data->Release();
CriticalSectionLocker lock(&m_objectLock);
m_olps.push_back(data);
// DEBUG_LOG1("Queue size: %d", m_olps.size());
StartReceiving();
void CompleteSend(size_t numBytes, OverlappedEx* data)
data->Release();
CriticalSectionLocker lock(&m_objectLock);
m_olps.push_back(data);
//DEBUG_LOG1("Queue size: %d", m_olps.size());
//DEBUG_LOG2("Object: %s num sent: %d", Name().c_str(), numBytes);
void StartReceiving()
DWORD bytesRecv = 0;
sockaddr_in senderAddr;
DWORD flags = 0;
int senderAddrSize = sizeof(senderAddr);
int rc = 0;
CriticalSectionLocker lock(&m_objectLock);
auto olp = GetBuffer();
if (!olp)
if (...)
m_olps.push_back(new OverlappedEx);
olp = GetBuffer();
else
if (...)
DEBUG_LOG1("Name: %s ************* NO AVAILABLE BUFFERS - bailing ***************", Name().c_str());
return;
olp->Clear();
olp->m_opType = OverlappedEx::OP_READ;
olp->AddRef();
switch(GetNetworkType())
case UDP:
rc = WSARecvFrom(Socket(),
&olp->m_wBuf,
1,
&bytesRecv,
&flags,
(SOCKADDR *)&senderAddr,
&senderAddrSize, (OVERLAPPED*)olp, NULL);
break;
case TCP:
rc = WSARecv(Socket(),
&olp->m_wBuf,
1,
&bytesRecv,
&flags,
(OVERLAPPED*)olp, NULL);
break;
if (SOCKET_ERROR == rc)
DWORD err = WSAGetLastError();
if (err != WSA_IO_PENDING)
olp->Release();
m_olps.push_back(olp);
void SetWriteBuf(const SendData& msg, OverlappedEx* data)
int len = min(msg.Data().size(), MAX_PACKET_SIZE);
memcpy(data->m_buffer, &msg.Data()[0], len);
data->m_wBuf.buf = data->m_buffer;
data->m_wBuf.len = len;
void StartSending(const SendData& msg)
DEBUG_LOG1("device name: %s", Name().c_str());
int rc = 0;
DWORD bytesSent = 0;
DWORD flags = 0;
SOCKET sock = Socket();
int addrSize = sizeof(sockaddr_in);
CriticalSectionLocker lock(&m_objectLock);
//UpdateOverlapped(OverlappedEx::OP_WRITE);
auto olp = GetBuffer();
if (!olp)
if (...)
m_olps.push_back(new OverlappedEx);
olp = GetBuffer();
DEBUG_LOG2("name: %s ************* NO AVAILABLE BUFFERS new size: %d ***************", Name().c_str(), m_olps.size());
else
if (...)
DEBUG_LOG1("Name: %s ************* NO AVAILABLE BUFFERS - bailing ***************", Name().c_str());
return;
olp->Clear();
olp->m_opType = OverlappedEx::OP_WRITE;
olp->AddRef();
SetWriteBuf(msg, olp);
switch(GetNetworkType())
case UDP:
rc = WSASendTo(Socket(), &olp->m_wBuf, 1,
&bytesSent, flags, (sockaddr*)&msg.SendAddress(),
addrSize, (OVERLAPPED*)olp, NULL);
break;
case TCP:
rc = WSASend(Socket(), &olp->m_wBuf, 1,
&bytesSent, flags, (OVERLAPPED*)olp, NULL);
break;
if (SOCKET_ERROR == rc)
DWORD err = WSAGetLastError();
if (err != WSA_IO_PENDING)
olp->Release();
m_olps.push_back(olp);
size_t ReceiveQueueSize()
CriticalSectionLocker lock(&m_receiveQueLock);
return m_receiveDataQueue.size();
void GetAllData(vector <ReceivedData> & data)
CriticalSectionLocker lock(&m_receiveQueLock);
while (m_receiveDataQueue.size() > 0)
data.push_back(m_receiveDataQueue.front());
m_receiveDataQueue.pop_front();
void DequeReceiveMessage(ReceivedData& msg)
CriticalSectionLocker lock(&m_receiveQueLock);
if (m_receiveDataQueue.size() > 0)
msg = m_receiveDataQueue.front();
m_receiveDataQueue.pop_front();
template <class T>
void EnqueReceiveMessage(T&& data)
CriticalSectionLocker lock(&m_receiveQueLock);
if (m_receiveDataQueue.size() <= MAX_RECEIVE_QUEUE_SIZE)
m_receiveDataQueue.push_back(data);
else
static int s_nLogCount = 0;
if (s_nLogCount % 100 == 0)
DEBUG_LOG2("Max queue size was reached handle id: %d in %s", Handle(), Name().c_str());
s_nLogCount++;
NetworkType GetNetworkType() const
return m_netType;
private:
ServerData(const ServerData&);
ServerData& operator=(const ServerData&);
private:
bool m_bEnabled; //!< This member flags if this reciever is enabled for receiving incoming connections.
int m_Handle; //!< This member holds the handle for this receiver.
SOCKET m_Socket; //!< This member holds the socket information for this receiver.
unsigned long m_nIPAddress; //!< This member holds an IP address the socket is bound to.
deque < ReceivedData > m_receiveDataQueue;
CRITICAL_SECTION m_receiveQueLock;
CRITICAL_SECTION m_objectLock;
string m_sName;
NetworkType m_netType;
deque<OverlappedEx*> m_olps;
size_t m_ovlpIndex;
;
#endif
【问题讨论】:
您应该首先调试您的代码。通过检测您的代码或使用 [UMDH] (docs.microsoft.com/en-us/windows-hardware/drivers/debugger/…) 来定位泄漏。 Isalamon:我大致知道发生了什么,每次我要进行异步发送或接收时,我都会检查包含我的 OverlappedEx 对象的双端队列是否有任何东西。如果不是,我在堆上创建一个新对象。当完成例程返回/或发生错误时,我通过将其粘贴回该双端队列来重用该对象。 大量的代码仍然不是一个完整的minimal reproducible example。题外话:using namespace std;
放在标题中对人们来说是一件坏事。
@user4581301:它仍处于原型设计阶段。
我也可以建议你使用 rio (如果你只使用 win8+) - 这对 udp 有很大的好处
【参考方案1】:
您对void Release()
的实现毫无意义-您减少了m_refCount
等等?必须是
void Release()
if (!InterlockedDecrement(&m_refCount)) delete this;
因此你永远不会释放 OverlappedEx* data
- 这是我刚刚看到的,这会导致内存泄漏。
也可以建议 - 使用WaitForSingleObject(CCommunicationManager::s_hShutdownEvent, 0);
这是检测关机的坏主意。只调用GetQueuedCompletionStatus
和关机调用PostQueuedCompletionStatus(s_IOCompletionPort, 0, 0, 0)
几次(数量或线程在s_IOCompletionPort
上监听)如果线程视图pOverlapped==0
- 退出。
使用
OverlappedEx* data = static_cast<OverlappedEx*>(pOverlapped);
而不是reinterpret_cast
make ~OverlappedEx()
private
- 不能直接调用,只能通过Release
调用
olp->Release();
m_olps.push_back(olp);
在对象上调用Release()
后,您不能在这里更多地访问它,所以或olp->Release()
或m_olps.push_back(olp);
但不能同时使用两者。这会杀死Release
的所有逻辑可能是您需要覆盖OverlappedEx
的operator delete
并在其中调用m_olps.push_back(olp);
,当然也需要覆盖operator new
再次(OVERLAPPED*)olp
- 在这里reinterpret_cast
是为了什么?因为你从OVERLAPPED
继承了自己的结构体,在这里编译器自动进行类型转换
【讨论】:
RbMm:非常感谢。每次我在调用完成例程时取回对象时,我都会将其放回双端队列中。这就是我重复使用它们的方式。 RbMm:为什么要静态转换? OverlappedEx 派生自 OVERLAPPED。 @MUXCAH - 正是因为这个 - 并且必须是static_cast
@MUXCAH - 但这里的dynamic_cast
是为了什么?!正是static_cast
是需要且正确的。在您当前的OVERLAPPED
和OverlappedEx
实现地址中是相同的 - 所以reinterpret_cast
和static_cast
给出相同的结果。但是,如果您更改 OverlappedEx
的定义(比如在 OVERLAPPED
或虚拟方法之前添加另一个基类),这将被破坏(reinterpret_cast
)而 static_cast
将正确工作
@MUXCAH - 明确地说 - 在当前的二进制实现中 OverlappedEx* data = reinterpret_cast<OverlappedEx*>(pOverlapped);
是正确的。但从 c++ 规则 - 这里需要 static_cast
(不是重新解释或动态)以上是关于IOCP 的内存使用情况[关闭]的主要内容,如果未能解决你的问题,请参考以下文章