C#多线程总结
Posted skig
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#多线程总结相关的知识,希望对你有一定的参考价值。
线程的创建
Thread
1 var thread = new Thread(() => 2 { 3 Console.WriteLine("thread start:" + Thread.CurrentThread.ManagedThreadId); //ManagedThreadId为线程的id 4 Thread.Sleep(10000); 5 Console.WriteLine("thread end:" + Thread.CurrentThread.ManagedThreadId); 6 }); 7 //设置是否为后台线程: 8 // 前台线程:所有前台线程执行结束后,该进程才会关闭退出(主线程和通过Thread类创建的线程默认是前台线程) 9 // 后台线程:所有前台结束后,后台线程就会立即结束(不管是否执行完成都会结束) 10 thread.IsBackground = true; 11 thread.Start();//开启线程,不传递参数 12 13 //传递参数的 14 var thread1 = new Thread(param => 15 { 16 Thread.Sleep(3000); 17 Console.WriteLine(param); 18 }); 19 thread1.Start("val"); 20 thread1.Join(); //等待线程执行完成(使当前调用Join的线程阻塞) 21 //暂停和恢复线程都标志为已过时了,不建议使用 22 //thread1.Suspend(); 23 //thread1.Resume(); 24 //设置线程的优先级,注意:在NT内核的Windows平台上建议不使用优先级来影响线程优先调度的行为,因为根本没法预期一个高优先级的线程必然会先于一个低优先级的线程执行,所以也就失去了控制线程调度的价值 25 //thread1.Priority = ThreadPriority.Highest; 26 //thread1.Abort(); //暴力的终止线程,一般不建议使用
Sleep/ SpinWait
Sleep与SpinWait的区别:
使用Thread.Sleep()会导致等待过于进行切换,等待时间不准确,而且会由用户模式切换到内核模式;使用SpinWait(一个轻量同步类型(结构体))来进行等待的处理,等待过程中会使用自旋等待,从而避免线程频繁的用户模式和内核模式切换,一般用于短时的等待操作:
1 //参数一为Func<bool>,就是自旋时的循环体,直到返回true或者过时为止 2 SpinWait.SpinUntil(() => 3 { 4 Console.WriteLine("Spin Waiting"); 5 return false; 6 }, 1000); 7 SpinWait.SpinUntil(() => false, 1000); //返回false会进入等待状态,类似于Thread.Sleep()等待,但是会盘旋CPU周期,在短期内等待事件准确度都高于Sleep 8 SpinWait.SpinUntil(() => true, 1000); //返回true会自动跳出等待状态,不再休眠,继续执行下面的代码
使用SpinWait做一些多线程的流程控制
1 int i = 0; 2 Task.Run(() => 3 { 4 Thread.Sleep(1000); //模拟一些操作 5 Interlocked.Increment(ref i); 6 }); 7 Task.Run(() => 8 { 9 Thread.Sleep(1000); //模拟一些操作 10 SpinWait.SpinUntil(() => i == 1); //等待1完成 11 Thread.Sleep(1000); //模拟一些操作 12 Interlocked.Increment(ref i); 13 }); 14 SpinWait.SpinUntil(() => i == 2); //等待所有流程完成 15 Console.WriteLine("Completed!");
ThreadPool
通过线程池创建线程,池中的线程都是后台线程
使用线程更应该使用线程池来创建:比如一个服务器需要处理成千上万个客户端链接,并处理不同的请求时,这种情况下如果简单通过Thread来创建线程处理,那么就是需要创建成千上万个线程了,那么多线程会频繁的调度切换,资源浪费严重、性能十分低下,因此需要线程池来维护多线程(会动态调整线程数量)
1 ThreadPool.QueueUserWorkItem(param => 2 { 3 Console.WriteLine(param); //val,param为传递过来的参数 4 }, "val");
Task
通过Task来创建线程(线程也是由线程池维护,也是后台线程),比ThreadPool更加灵活方便
1 var tasks = new List<Task>(); 2 tasks.Add(Task.Factory.StartNew(param => 3 { 4 Thread.Sleep(5000); 5 Console.WriteLine(param); 6 }, "val")); 7 tasks.Add(Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId))); 8 Task.WaitAny(tasks.ToArray()); //等待(阻塞)只要有一个Task执行完毕就不再等待了 9 Task.WaitAll(tasks.ToArray()); //等待(阻塞)所有Task执行结束 10 11 //带返回值的 12 var task = Task.Run<string>(() => 13 { 14 Thread.Sleep(3000); 15 return "rtn Val"; 16 }); 17 //task.Wait(); //等待执行结束 18 Console.WriteLine(task.Result); //获取返回的结果,调用Result就会等待Task执行结束返回结果,因此也会造成阻塞
ConfigureAwait
1 Task.Run(() => 2 { 3 Thread.Sleep(1000); 4 Console.WriteLine("Async"); 5 6 //ConfigureAwait为false发生异常的时候不会回取捕捉原始Context(上下文), 7 //这样子就是在线程池中运行,而不是在ASP.NET/UI的Context的上下文线程中运 8 //行了,这样子性能上提高了 9 }).ConfigureAwait(false);
1 // Thread.Sleep是同步延迟, Task.Delay异步延迟; 2 // Thread.Sleep不能取消,Task.Delay可以。 3 Task.Run(async () => 4 { 5 //将任务延迟1000毫秒后运行,如果无限等待那么指定为-1 6 await Task.Delay(1000); 7 Console.WriteLine("Task Start"); 8 //CancellationToken设置为true就是标志Task任务取消,为false和 await Task.Delay(1000)一样将任务延迟1000毫秒后运行 9 await Task.Delay(1000, new CancellationToken(true)); 10 Console.WriteLine("这里不会被执行,因为任务取消了~"); 11 });
Task与async/await
1 public class TaskTest 2 { 3 public Task DoAsync(string param) 4 { 5 return Task.Run(() => 6 { 7 //调用Result会阻塞直到获取到返回值 8 NextDo(LongTimeDoAsync(param).Result); 9 }); 10 } 11 12 public async Task Do1Async(string param) 13 { 14 //对比上面的DoAsync方法,执行结果一样,但是使用async/await配合Task使用,节省了代码量,而且也方便外部的调用和等待处理等等 15 NextDo(await LongTimeDoAsync(param)); 16 } 17 18 async Task<object> LongTimeDoAsync(string param) 19 { 20 return await Task.Run<object>(() => 21 { 22 //执行一些耗时的操作 23 Thread.Sleep(10000); 24 return param + " ok"; 25 }); 26 } 27 28 void NextDo(object result) 29 { 30 Console.WriteLine(result); 31 } 32 }
调用:
1 var test = new TaskTest(); 2 test.DoAsync("DoAsync"); 3 test.Do1Async("Do1Async");
并发集合
在System.Collections.Concurrent下的集合类,都是些多线程安全集合,而ConcurrentXXX为并发集合,有不少方法带有Try前缀,这些方法在多线程下执行过程中可能会失败返回false,因此不要相信这些操作会一定完成任务,需要判断返回的结果;还有BlockingCollection<T>是阻塞集合,就是添加/获取元素的时候会阻塞线程直到操作完成。
ConcurrentDictionary
1 ConcurrentDictionary<string, string> dict = new ConcurrentDictionary<string, string>(); 2 dict.TryAdd("key1", "val1"); 3 string val; 4 dict.TryGetValue("key1", out val); 5 dict.TryUpdate("key1", "val2", val);//最后参数为比较的值,值不同才会更新 6 dict.TryRemove("key1", out val); 7 Console.WriteLine(val); //val2 8 9 val = dict.GetOrAdd("key1", "val3"); 10 val = dict.GetOrAdd("key1", "val4"); 11 Console.WriteLine(val); //val3 12 13 dict["key1"] = null; 14 //对于AddOrUpdate方法,如果指定的key已经存在,那么调用第三个参数进行UpdateValue 15 //如果不存在,那么调用第二个参数进行AddValue 16 val = dict.AddOrUpdate("key1", "val5", (key, oldVal) => 17 { 18 Console.WriteLine(oldVal); //null 19 return "val6"; 20 }); 21 Console.WriteLine(val); //val6 22 23 val = dict.AddOrUpdate("key2", key => 24 { 25 return "val7"; 26 }, (key, oldVal) => 27 { 28 Console.WriteLine(oldVal); 29 return "val8"; 30 }); 31 Console.WriteLine(val); //val7
ConcurrentQueue
1 ConcurrentQueue<string> q = new ConcurrentQueue<string>(); 2 q.Enqueue("val1"); 3 q.Enqueue("val2"); 4 string val; 5 q.TryPeek(out val); 6 Console.WriteLine(val); //val1 7 q.TryDequeue(out val); 8 Console.WriteLine(val); //val1
ConcurrentStack
1 ConcurrentStack<string> s = new ConcurrentStack<string>(); 2 s.Push("val1"); 3 s.Push("val2"); 4 string val; 5 s.TryPeek(out val); 6 Console.WriteLine(val); //val2 7 s.TryPop(out val); 8 Console.WriteLine(val); //val2
ConcurrentBag
1 //ConcurrentBag:无序的并发集合(相同元素可重复添加) 2 ConcurrentBag<object> bag = new ConcurrentBag<object>(); 3 var obj = new object(); 4 bag.Add(obj); 5 bag.Add(obj); 6 Console.WriteLine(bag.Count); //2 7 while (!bag.IsEmpty) //判断集合是否为空 8 { 9 bag.TryTake(out obj); //获取 10 }
并行计算
Parallel
For
1 //并行计算,调用的线程会等待直到并行执行完毕 2 Parallel.For(2, 10, i => 3 { 4 //i的值为[2, 10)(不包括10),就是执行次数为8次 5 Console.WriteLine(i); 6 });
1 //MaxDegreeOfParallelism为指定并行计算的最大线程数 2 Parallel.For(1, 10, new ParallelOptions { MaxDegreeOfParallelism = 3 }, i => 3 { 4 Console.WriteLine(Thread.CurrentThread.ManagedThreadId); 5 });
1 int result = 0; 2 Parallel.For(0, 100, new ParallelOptions { MaxDegreeOfParallelism = 4 }, 3 //初始化localState 4 () => 0, 5 //并行循环体(i为[0, 100),也就是会执行100次) 6 (i, loop, localState) => 7 { 8 //localState从0开始,不断累加i的值 9 return localState + i; //循环体中返回的结果会在下面的回调中进行值的合并(结果的合并必须在下面进行) 10 }, 11 //合并计算的结果 12 localState => Interlocked.Add(ref result, localState) 13 ); 14 Console.WriteLine("真实结果: {0}. 预期结果:4950.", result);
ForEach
1 int aCount = 0; 2 //并行计算,会等待(阻塞)直到执行完成 3 Parallel.ForEach("aaaabbbbbcccc", 4 //设置并行计算的最大线程数 5 new ParallelOptions { MaxDegreeOfParallelism = 4 }, 6 c => 7 { 8 //计算\'a\'的个数 9 if (c == \'a\') 10 { 11 Interlocked.Increment(ref aCount); 12 } 13 }); 14 Console.WriteLine(aCount); //4
1 //Partitioner为设置策略分区:例如值范围为[0, 100],每个区域的大小为4 2 Parallel.ForEach(Partitioner.Create(0, 10, 4), 3 val => 4 { 5 Console.WriteLine(val); //val是一个Tuple<int, int>,分成的区间值有:(0, 4),(4, 8),(8, 10) 6 }); 7 8 int result = 0; 9 Parallel.ForEach(Partitioner.Create(1, 101, 10), 10 val => 11 { 12 for (int i = val.Item1; i < val.Item2; i++) 13 { 14 Interlocked.Add(ref result, i); 15 } 16 }); 17 Console.WriteLine(result); //输出:5050
1 int[] vals = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 2 int sum = 0; 3 Parallel.ForEach( 4 vals, 5 //localSum的初始值 6 () => 0, 7 //并行执行的循环体 8 (val, loopState, localSum) => 9 { 10 //val为集合vals中的值 11 //这里的操作是并行计算集合中值的总和 12 localSum += val; 13 return localSum; //循环体中返回的结果会在下面的回调中进行值的合并(结果的合并必须在下面进行) 14 }, 15 //合并计算的结果 16 (localSum) => Interlocked.Add(ref sum, localSum) 17 ); 18 Console.WriteLine(sum); //55
Invoke
1 int i = 0; 2 Action action = () => Interlocked.Increment(ref i); 3 Action action1 = () => Interlocked.Add(ref i, 2); 4 Action action2 = () => Interlocked.Add(ref i, 3); 5 //并行调用Action,调用的线程会等待直到并行执行完毕 6 Parallel.Invoke(action, action1, action2); 7 //Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = 3 }, action, action1, action2); 8 Console.WriteLine(i); //输出:6
PLINQ
1 var list = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 2 //PLINQ,进行并行计算,但是PLINQ不能限定并行计算时的最大线程数 3 list.AsParallel().ForAll(l => 4 { 5 Console.WriteLine(Thread.CurrentThread.ManagedThreadId); 6 }); 7 8 Console.WriteLine(list.AsParallel().Where(l => l > 5).Sum()); //40 9 Console.WriteLine(list.AsParallel().Aggregate((sum, val) => 10 { 11 return val + sum + 1; 12 })); //64 13 14 var list1 = new int[] { 1, 1, 1, 2, 2, 2, 3 }; 15 Console.WriteLine(list1.AsParallel().GroupBy(l => l).Count()); //3
线程同步
lock(Monitor) / SpinLock
lock
lock使用起来很简单,为Monitor封装的语法糖:
1 lock (obj) 2 { 3 //同步操作. 4 }
锁的对象不要为类型Type,因为性能上会损失大:lock(typeof(Class))
Monitor
1 Monitor.Enter(obj); 2 try 3 { 4 //同步操作 5 } 6 finally 7 { 8 Monitor.Exit(obj); 9 }
使用Monitor的主要优点就是可设置等待的超时值:
1 bool lockTaken = false; 2 Monitor.TryEnter(obj, 1000, ref lockTaken); 3 if (lockTaken) 4 { 5 try 6 { 7 //同步操作 8 } 9 finally 10 { 11 Monitor.Exit(obj); 12 } 13 } 14 else 15 { 16 Console.WriteLine("C#多线程总结