多线程环境下队列操作之锁的教训

Posted 拙园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程环境下队列操作之锁的教训相关的知识,希望对你有一定的参考价值。

之前一直在研究多线程环境下的编程方法,却很少实战体验,以至于我一提到多线程编程,我总是信心不足,又总是说不出到底哪里不明白。今天工程现场反馈了一个“老问题”,我一直担心的是DAServer的运行机制有什么我不明白的地方,DAS Toolkit中总有一部分是我没有仔细研究的,在我心中有阴影,所以工程出了问题我第一反应就是“会不会问题出在阴影里?”。结果,至今为止,我总结起来问题90%都是在自己编码部分。

  我的DAServer中有一个需求,就是上送某个定点数据的速度不能太快,否则后台接收不过来。于是,我设计了一个类,包含了两条队列,其中一条队列是需要上送的数据,另一条队列是历史上曾经更新过的点记录及上送时间。每次组装报文时需要检查队列中是否有某个点,以及该点是否在1.5秒内“曾经上送过”。如果上送过就等下次再来检查,如果没有上送过则上送此信息并记录一下上送时间。

  我在设计这个类时一直在关注逻辑,却忽略了这是一个多线程环境的应用——装入队列数据的线程和取出队列数据的线程是不同的。好吧,只能说明我们的应用场景太低端,多线程争用资源的场景不是太频繁,以至于我过了这么久才明白过来。这个类的设计代码如下:

技术分享图片
技术分享图片
 1 struct SUpdateValue
 2 {
 3     int m_dataID;            // 数据点号
 4     DASVariant m_dataValue;    // 数据值
 5     SUpdateValue* next;
 6 
 7     SUpdateValue() : m_dataID(0), next(NULL){ }
 8     SUpdateValue(int id, DASVariant& val) : m_dataID(id), m_dataValue(val), next(NULL){    }
 9 };
10 
11 struct SUpdateTime
12 {
13     int dataID;            // 数据点号
14     long lastSendTime;    // 上次上送缓冲数据的时间戳(毫秒)
15     SUpdateTime* next;
16 
17     SUpdateTime() : dataID(0), lastSendTime(0), next(NULL){}
18 };
19 
20 class CDelayQueue
21 {
22 public:
23     CDelayQueue();
24     ~CDelayQueue();
25     bool EnqueFrame(int dataID, DASVariant& dataValue);        // 向延迟队列中装入新数据
26     bool DequeFrame(int dataID, DASVariant& dataValue);        // 从延迟队列中取出指定的数据
27     void DelayUpdateTime(int dataID);    // 设定数据点的更新时间戳,一般用于防止过快上送
28     int GetFrameCount(void);            // 获取延迟队列中的对象个数
29     string GetFrameList(void);            // 获取对象名称列表,以逗号分隔
30 
31 protected:
32     bool AskForUpdate(int dataID);        // 申请上送新数据
33 
34 private:
35     SUpdateValue* m_pHead;        // 需要延迟发送的数据队列
36     SUpdateValue* m_pTail;
37     SUpdateTime* m_pTimeHead;    // 已经发送的数据队列的历史记录(超时后失去记录)
38 };
技术分享图片

  实现部分如下:

技术分享图片
技术分享图片
  1 CDelayQueue::CDelayQueue() : m_pHead(NULL), m_pTail(NULL), m_pTimeHead(NULL)
  2 {
  3 }
  4 
  5 CDelayQueue::~CDelayQueue()
  6 {
  7     while (NULL != m_pHead)
  8     {
  9         SUpdateValue* p = m_pHead;
 10         m_pHead = m_pHead->next;
 11         delete p;
 12     }
 13 
 14     while (NULL != m_pTimeHead)
 15     {
 16         SUpdateTime* p = m_pTimeHead;
 17         m_pTimeHead = m_pTimeHead->next;
 18         delete p;
 19     }
 20 }
 21 
 22 bool CDelayQueue::EnqueFrame(int dataID, DASVariant& dataValue)
 23 {
 24     SUpdateValue* pNew = new SUpdateValue(dataID, dataValue);
 25     if (NULL == pNew)
 26     {
 27         return false;
 28     }
 29 
 30     if (NULL == m_pHead)
 31     {
 32         m_pHead = m_pTail = pNew;
 33     }
 34     else
 35     {
 36         m_pTail->next = pNew;
 37         m_pTail = m_pTail->next;
 38     }
 39 
 40     return true;
 41 }
 42 
 43 bool CDelayQueue::DequeFrame(int dataID, DASVariant& dataValue)
 44 {
 45     if (NULL == m_pHead)
 46     {
 47         return false;
 48     }
 49 
 50     // 检查队列中是否存在该点
 51     if (m_pHead->m_dataID == dataID)
 52     {
 53         if (AskForUpdate(dataID))
 54         {
 55             dataValue = m_pHead->m_dataValue;
 56             SUpdateValue* pDel = m_pHead;
 57             m_pHead = m_pHead->next;
 58             delete pDel;
 59             return true;
 60         }
 61         return false;
 62     }
 63     
 64     SUpdateValue* pPre = m_pHead;
 65     SUpdateValue* pValue = m_pHead->next;
 66     while (pValue != NULL)
 67     {
 68         if (pValue->m_dataID == dataID)
 69         {
 70             if (AskForUpdate(pValue->m_dataID))
 71             {
 72                 dataValue = pValue->m_dataValue;
 73                 pPre->next = pValue->next;
 74                 delete pValue;
 75                 return true;
 76             }
 77             return false;
 78         }
 79         pPre = pValue;
 80         pValue = pPre->next;
 81     }
 82 
 83     return false;
 84 }
 85 
 86 bool CDelayQueue::AskForUpdate(int dataID)
 87 {
 88     long curTime = GetTickCount();
 89 
 90     // 检查是否在短时间内更新过该数据点
 91     SUpdateTime* pList = m_pTimeHead;
 92     while (pList)
 93     {
 94         if (pList->dataID == dataID)
 95         {
 96             if ((curTime - pList->lastSendTime) < 1500)        // xiaoku
 97             {
 98                 return false;
 99             }
100             pList->lastSendTime = curTime;
101             return true;
102         }
103         pList = pList->next;
104     }
105 
106     // 如果记录中没有目标点,则创建历史记录
107     if (NULL == pList)
108     {
109         pList = new SUpdateTime();
110         pList->dataID = dataID;
111         pList->lastSendTime = curTime;
112         pList->next = m_pTimeHead;
113         m_pTimeHead = pList;
114     }
115 
116     return true;
117 }
118 
119 void CDelayQueue::DelayUpdateTime(int dataID)
120 {
121     long curTime = ::GetTickCount();
122     SUpdateTime* pList = m_pTimeHead;
123     while (pList)
124     {
125         if (pList->dataID == dataID)
126         {
127             pList->lastSendTime = curTime - 500;
128             return ;
129         }
130         pList = pList->next;
131     }
132 
133     // 如果记录中没有目标点,则创建历史记录
134     if (NULL == pList)
135     {
136         pList = new SUpdateTime();
137         pList->dataID = dataID;
138         pList->lastSendTime = curTime - 500;
139         pList->next = m_pTimeHead;
140         m_pTimeHead = pList;
141     }
142 }
143 
144 int CDelayQueue::GetFrameCount()
145 {
146     if (NULL == m_pHead)
147     {
148         return 0;
149     }
150 
151     int count = 1;
152     SUpdateValue* p = m_pHead;
153     while(p != m_pTail)
154     {
155         ++count;
156         p = p->next;
157     }
158     return count;
159 }
160 
161 string CDelayQueue::GetFrameList()
162 {
163     string strNameList("");
164     if (NULL == m_pHead)
165     {
166         return strNameList;
167     }
168     SUpdateValue* p = m_pHead;
169     char szData[16];
170     sprintf_s(szData, 16, "%d", p->m_dataID);
171     strNameList += szData;
172     while(p != m_pTail)
173     {
174         sprintf_s(szData, 16, "%d", p->next->m_dataID);
175         strNameList += ";";
176         strNameList += szData;
177         p = p->next;
178     }
179 
180     return strNameList;
181 }
技术分享图片

  最关键两个接口是EnqueFrame() 和 DequeFrame() ,分别是装入队列和从队列中取数。都是在操作队列,如果是不同的线程,怎么能不考虑线程互斥的问题呢?好吧,迅速引入LockerGuard。

  这个失败的例子放在这里警示一下自己!

以上是关于多线程环境下队列操作之锁的教训的主要内容,如果未能解决你的问题,请参考以下文章

多线程之锁机制

无锁队列的实现

java浅谈线程安全之锁

《Python》线程之锁信号量事件条件定时器队列

计算机基础之锁的分类

java并发之锁的使用浅析