多线程通用处理队列类

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程通用处理队列类相关的知识,希望对你有一定的参考价值。

  1     public class DownLoadFile
  2     {
  3         public string FileName { get; set; }
  4 
  5     }
  6     /// <summary>
  7     /// 下载线程队列
  8     /// </summary>
  9     public class DownLoadQueueThread : QueueThreadBase<DownLoadFile>
 10     {
 11         /// <summary>
 12         /// 
 13         /// </summary>
 14         /// <param name="list">下载的列表ID</param>
 15         public DownLoadQueueThread(IEnumerable<DownLoadFile> list)
 16             : base(list)
 17         {
 18 
 19         }
 20         /// <summary>
 21         /// 每次多线程都到这里来,处理多线程
 22         /// </summary>
 23         /// <param name="pendingValue"列表ID></param>
 24         /// <returns></returns>
 25         protected override DoWorkResult DoWork(DownLoadFile pendingFile)
 26         {
 27             try
 28             {
 29                 //Thread.Sleep(5000);
 30                 for (int i = 0; i < 100000; i++)
 31                 {
 32                     //Thread.Sleep(100);
 33                 }
 34                 //..........多线程处理....
 35                 return DoWorkResult.ContinueThread;//没有异常让线程继续跑..
 36             }
 37             catch (Exception)
 38             {
 39                 return DoWorkResult.AbortCurrentThread;//有异常,可以终止当前线程.当然.也可以继续,
 40                 //return  DoWorkResult.AbortAllThread; //特殊情况下 ,有异常终止所有的线程...
 41             }
 42 
 43             //return base.DoWork(pendingValue);
 44         }
 45     }
 46 
 47     /// <summary>
 48     /// 队列多线程,T 代表处理的单个类型~
 49     /// </summary>
 50     /// <typeparam name="T"></typeparam>
 51     public abstract class QueueThreadBase<T>
 52     {
 53         #region 变量&属性
 54         /// <summary>
 55         /// 待处理结果
 56         /// </summary>
 57         private class PendingResult
 58         {
 59             /// <summary>
 60             /// 待处理值
 61             /// </summary>
 62             public T PendingValue { get; set; }
 63             /// <summary>
 64             /// 是否有值
 65             /// </summary>
 66             public bool IsHad { get; set; }
 67         }
 68         /// <summary>
 69         /// 线程数
 70         /// </summary>
 71         public int ThreadCount
 72         {
 73             get { return this.m_ThreadCount; }
 74             set { this.m_ThreadCount = value; }
 75         }
 76         private int m_ThreadCount = 5;
 77         /// <summary>
 78         /// 取消=True
 79         /// </summary>
 80         public bool Cancel { get; set; }
 81         /// <summary>
 82         /// 线程列表
 83         /// </summary>
 84         List<Thread> m_ThreadList;
 85         /// <summary>
 86         /// 完成队列个数
 87         /// </summary>
 88         private volatile int m_CompletedCount = 0;
 89         /// <summary>
 90         /// 队列总数
 91         /// </summary>
 92         private int m_QueueCount = 0;
 93         /// <summary>
 94         /// 全部完成锁
 95         /// </summary>
 96         private object m_AllCompletedLock = new object();
 97         /// <summary>
 98         /// 完成的线程数
 99         /// </summary>
100         private int m_CompetedCount = 0;
101         /// <summary>
102         /// 队列锁
103         /// </summary>
104         private object m_PendingQueueLock = new object();
105         private Queue<T> m_InnerQueue;
106         #endregion
107 
108         #region 事件相关
109         /// <summary>
110         /// 全部完成事件
111         /// </summary>
112         public event Action<CompetedEventArgs> AllCompleted;
113         /// <summary>
114         /// 单个完成事件
115         /// </summary>
116         public event Action<T, CompetedEventArgs> OneCompleted;
117         /// <summary>
118         /// 引发全部完成事件
119         /// </summary>
120         /// <param name="args"></param>
121         private void OnAllCompleted(CompetedEventArgs args)
122         {
123             if (AllCompleted != null)
124             {
125                 try
126                 {
127                     AllCompleted(args);//全部完成事件
128                 }
129                 catch { }
130             }
131         }
132         /// <summary>
133         /// 引发单个完成事件
134         /// </summary>
135         /// <param name="pendingValue"></param>
136         /// <param name="args"></param>
137         private void OnOneCompleted(T pendingValue, CompetedEventArgs args)
138         {
139             if (OneCompleted != null)
140             {
141                 try
142                 {
143                     OneCompleted(pendingValue, args);
144                 }
145                 catch { }
146 
147             }
148         }
149         #endregion
150 
151         #region 构造
152         public QueueThreadBase(IEnumerable<T> collection)
153         {
154             m_InnerQueue = new Queue<T>(collection);
155             this.m_QueueCount = m_InnerQueue.Count;
156         }
157 
158         #endregion
159 
160         #region 主体
161         /// <summary>
162         /// 初始化线程
163         /// </summary>
164         private void InitThread()
165         {
166             m_ThreadList = new List<Thread>();
167             for (int i = 0; i < ThreadCount; i++)
168             {
169                 Thread t = new Thread(new ThreadStart(InnerDoWork));
170                 m_ThreadList.Add(t);
171                 t.IsBackground = true;
172                 t.Start();
173             }
174         }
175         /// <summary>
176         /// 开始
177         /// </summary>
178         public void Start()
179         {
180             InitThread();
181         }
182         /// <summary>
183         /// 线程工作
184         /// </summary>
185         private void InnerDoWork()
186         {
187             try
188             {
189                 Exception doWorkEx = null;
190                 DoWorkResult doworkResult = DoWorkResult.ContinueThread;
191                 var t = CurrentPendingQueue;
192                 while (!this.Cancel && t.IsHad)
193                 {
194                     try
195                     {
196                         doworkResult = DoWork(t.PendingValue);
197                     }
198                     catch (Exception ex)
199                     {
200                         doWorkEx = ex;
201                     }
202                     m_CompletedCount++;
203                     int precent = m_CompletedCount * 100 / m_QueueCount;
204                     OnOneCompleted(t.PendingValue, new CompetedEventArgs() { CompetedPrecent = precent, InnerException = doWorkEx });
205                     if (doworkResult == DoWorkResult.AbortAllThread)
206                     {
207                         this.Cancel = true;
208                         break;
209                     }
210                     else if (doworkResult == DoWorkResult.AbortCurrentThread)
211                     {
212                         break;
213                     }
214                     t = CurrentPendingQueue;
215                 }
216 
217                 lock (m_AllCompletedLock)
218                 {
219                     m_CompetedCount++;
220                     if (m_CompetedCount == m_ThreadList.Count)
221                     {
222                         OnAllCompleted(new CompetedEventArgs() { CompetedPrecent = 100 });
223                     }
224                 }
225 
226             }
227             catch
228             {
229                 throw;
230             }
231         }
232         /// <summary>
233         /// 子类重写
234         /// </summary>
235         /// <param name="pendingValue"></param>
236         /// <returns></returns>
237         protected virtual DoWorkResult DoWork(T pendingValue)
238         {
239             return DoWorkResult.ContinueThread;
240         }
241         /// <summary>
242         /// 获取当前结果
243         /// </summary>
244         private PendingResult CurrentPendingQueue
245         {
246             get
247             {
248                 lock (m_PendingQueueLock)
249                 {
250                     PendingResult t = new PendingResult();
251                     if (m_InnerQueue.Count != 0)
252                     {
253                         t.PendingValue = m_InnerQueue.Dequeue();
254                         t.IsHad = true;
255                     }
256                     else
257                     {
258                         t.PendingValue = default(T);
259                         t.IsHad = false;
260                     }
261                     return t;
262                 }
263             }
264         }
265 
266         #endregion
267 
268         #region 相关类&枚举
269         /// <summary>
270         /// dowork结果枚举
271         /// </summary>
272         public enum DoWorkResult
273         {
274             /// <summary>
275             /// 继续运行,默认
276             /// </summary>
277             ContinueThread = 0,
278             /// <summary>
279             /// 终止当前线程
280             /// </summary>
281             AbortCurrentThread = 1,
282             /// <summary>
283             /// 终止全部线程
284             /// </summary>
285             AbortAllThread = 2
286         }
287         /// <summary>
288         /// 完成事件数据
289         /// </summary>
290         public class CompetedEventArgs : EventArgs
291         {
292             public CompetedEventArgs()
293             {
294 
295             }
296             /// <summary>
297             /// 完成百分率
298             /// </summary>
299             public int CompetedPrecent { get; set; }
300             /// <summary>
301             /// 异常信息
302             /// </summary>
303             public Exception InnerException { get; set; }
304         }
305         #endregion
306 
307     }

 

以上是关于多线程通用处理队列类的主要内容,如果未能解决你的问题,请参考以下文章

多线程通用处理队列类

C#多线程处理多个队列数据的方法

多线程批处理队列

队列与多线程间关系——个人理解

python 多线程与队列

Python线程队列与多处理管道