c#:ThreadPool实现并行分析,并实现线程同步结束
Posted yy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c#:ThreadPool实现并行分析,并实现线程同步结束相关的知识,希望对你有一定的参考价值。
- 背景:
一般情况下,经常会遇到一个单线程程序时执行对CPU,MEMORY,IO利用率上不来,且速度慢下问题;那么,怎么解决这些问题呢?
据我个人经验来说有以下两种方式:
1、并行、多线程(Parallel、Task、ThreadPool)
2、多进程MutilpleProcess
恰好工作中又一次遇到单线程程序性能低的问题,本次我主要想尝试使用ThreadPool来实现多线程,并且在实现多线程任务同步结束。
测试代码:
1 static void Main(string[] args) 2 { 3 using (ManualResetEvent finish = new ManualResetEvent(false)) 4 { 5 int maxThreadCount = 100; 6 for (var i = 0; i < 100; i++) { 7 ThreadPool.QueueUserWorkItem((Object state)=> { 8 Console.WriteLine("task:{0}",state); 9 10 // 以原子操作的形式递减指定变量的值并存储结果。 11 if (Interlocked.Decrement(ref maxThreadCount) == 0) { 12 // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。 13 finish.Set(); 14 } 15 }, i); 16 } 17 18 // 阻止当前线程,直到当前 System.Threading.WaitHandle 收到信号。 19 finish.WaitOne(); 20 } 21 22 Console.WriteLine("Complete!"); 23 Console.ReadKey();
- 实现多线程时,需要注意事项:
可是一般情况下遇到这种业务的情况下,只要修改多线程,必然会遇到某个对象不允许被多个线程操作的问题。
比如:
1、多个线程同时向一个文件中写入内容,这种情况一般使用锁来包成被访问对象的安全性。比如:互斥锁(lock、Mutex)、读写锁(ReadWriteLock)、Monitor、Semaphore(信号灯)、InterLocked(内存共享)等。
2、多个线程同时修改一个非线程安全集合对象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)时,往往会抛出异常。针对这种情况,需要使用命名空间System.Collections.Concurrent.*下支持线程安全的集合、字典、队列、栈等对象来替代。
- 业务场景:
我们需要对一个多行文本文件进行解析,根据具体地址解析其中的经纬度信息。如果解析过程中解析失败的行,需要记录到一个_error.txt;解析成功的记录行,记录到_result.txt。使用单线程分析过程中已经遇到了性能低问题,需求解决方案是使用ThreadPool技术。
- 业务实现:
1 private static int maxThreadCount = 0; 2 private static int fakeMaxThreadCount = int.MaxValue; 3 private static ManualResetEvent finish = new ManualResetEvent(false); 4 private static object errorLocker = new object(); 5 private static object resultLocker = new object(); 6 private static object maxThreadCountLcker = new object(); 7 8 public void ParserFile(string filePath) 9 { 10 using (StreamWriter writerError = new StreamWriter(filePath + "_error")) 11 { 12 using (StreamWriter writerResult = new StreamWriter(filePath + "_result")) 13 { 14 finish = new ManualResetEvent(false); 15 using (StreamReader reader = new StreamReader(filePath)) 16 { 17 string line = reader.ReadLine(); 18 while (line != null) 19 { 20 maxThreadCount++; 21 ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult 22 }); 23 24 line = reader.ReadLine(); 25 } 26 } 27 28 maxThreadCount++; 29 lock (maxThreadCountLcker) 30 { 31 fakeMaxThreadCount = maxThreadCount; 32 } 33 34 ThreadPool.QueueUserWorkItem(DoWork, new object[] { }); 35 36 finish.WaitOne(); 37 finish.Close(); 38 finish.Dispose(); 39 } 40 } 41 } 42 43 44 45 private void DoWork(object state) 46 { 47 object[] objectItem = state as object[]; 48 if (objectItem.Length != 3) 49 { 50 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0) 51 { 52 finish.Set(); 53 } 54 return; 55 } 56 string line = objectItem[0].ToString(); 57 StreamWriter writerError = objectItem[1] as StreamWriter; 58 StreamWriter writerResult = objectItem[2] as StreamWriter; 59 60 try 61 { 62 string[] fields = line.Split(new char[] { ‘|‘ }); 63 64 string imsi = fields[0]; 65 string city = fields[1]; 66 string county = fields[2]; 67 string address = fields[3]; 68 69 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**区**路23号 70 string uri = " http://restapi.amap.com/v3/geocode/geo"; 71 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名称)", address); 72 73 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**区***路|23号","province":"***","citycode":"***","city":"***市","district":"***区","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"门牌号"}]} 74 string result = GetRequesetContext(uri, parameter); 75 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1) 76 { 77 lock (errorLocker) 78 { 79 writerError.WriteLine(result); 80 } 81 } 82 else 83 { 84 int indexCount = 0; 85 List<string> lnglatItems = new List<string>(); 86 foreach (string resultItem in result.Split(new string[] { "\",\"", ",\"" }, StringSplitOptions.RemoveEmptyEntries)) 87 { 88 if (resultItem.IndexOf("location") != -1) 89 { 90 indexCount++; 91 lnglatItems.Add(resultItem.Split(new char[] { ‘:‘ })[1].Replace("\"", string.Empty)); 92 } 93 } 94 if (indexCount == 1) 95 { 96 lock (resultLocker) 97 { 98 writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi); 99 } 100 } 101 else 102 { 103 lock (resultLocker) 104 { 105 writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi); 106 } 107 } 108 } 109 } 110 catch (Exception ex) 111 { 112 logger.Error("{0}\r\n{1}", ex.Message, ex.StackTrace); 113 lock (errorLocker) 114 { 115 writerError.WriteLine(line); 116 } 117 } 118 finally 119 { 120 lock (maxThreadCountLcker) 121 { 122 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0) 123 { 124 finish.Set(); 125 } 126 } 127 } 128 }
以上是关于c#:ThreadPool实现并行分析,并实现线程同步结束的主要内容,如果未能解决你的问题,请参考以下文章
多线程实现Thread.Start()与ThreadPool.QueueUserWorkItem两种方式对比