8天玩转并行开发——第五天 同步机制(下)

Posted jeely

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了8天玩转并行开发——第五天 同步机制(下)相关的知识,希望对你有一定的参考价值。

     承接上一篇,我们继续说下.net4.0中的同步机制,是的,当出现了并行计算的时候,轻量级别的同步机制应运而生,在信号量这一块

出现了一系列的轻量级,今天继续介绍下面的3个信号量 CountdownEvent,SemaphoreSlim,ManualResetEventSlim。

 

一:CountdownEvent

     这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个

人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位。好,这就是简单的信号计数机制,从技术角

度上来说它是定义了最多能够进入关键代码的线程数。

     但是CountdownEvent更牛X之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,

这样做有什么好处呢?还是承接上一篇文章所说的,比如一个任务需要加载1w条数据,那么可能出现这种情况。

 

加载User表:         根据user表的数据量,我们需要开5个task。

加载Product表:    产品表数据相对比较多,计算之后需要开8个task。

加载order表:       由于我的网站订单丰富,计算之后需要开12个task。

 

先前的文章也说了,我们需要协调task在多阶段加载数据的同步问题,那么如何应对这里的5,8,12,幸好,CountdownEvent给我们提供了

可以动态修改的解决方案。

 

技术图片
  1 using System.Collections.Concurrent;
  2 using System.Threading.Tasks;
  3 using System;
  4 using System.Diagnostics;
  5 using System.Collections.Generic;
  6 using System.Linq;
  7 using System.Threading;
  8 
  9 class Program
 10 
 11     //默认的容纳大小为“硬件线程“数
 12     static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);
 13 
 14     static void Main(string[] args)
 15     
 16         //加载User表需要5个任务
 17         var userTaskCount = 5;
 18 
 19         //重置信号
 20         cde.Reset(userTaskCount);
 21 
 22         for (int i = 0; i < userTaskCount; i++)
 23         
 24             Task.Factory.StartNew((obj) =>
 25             
 26                 LoadUser(obj);
 27             , i);
 28         
 29 
 30         //等待所有任务执行完毕
 31         cde.Wait();
 32 
 33         Console.WriteLine("\\nUser表数据全部加载完毕!\\n");
 34 
 35         //加载product需要8个任务
 36         var productTaskCount = 8;
 37 
 38         //重置信号
 39         cde.Reset(productTaskCount);
 40 
 41         for (int i = 0; i < productTaskCount; i++)
 42         
 43             Task.Factory.StartNew((obj) =>
 44             
 45                 LoadProduct(obj);
 46             , i);
 47         
 48 
 49         cde.Wait();
 50 
 51         Console.WriteLine("\\nProduct表数据全部加载完毕!\\n");
 52 
 53         //加载order需要12个任务
 54         var orderTaskCount = 12;
 55 
 56         //重置信号
 57         cde.Reset(orderTaskCount);
 58 
 59         for (int i = 0; i < orderTaskCount; i++)
 60         
 61             Task.Factory.StartNew((obj) =>
 62             
 63                 LoadOrder(obj);
 64             , i);
 65         
 66 
 67         cde.Wait();
 68 
 69         Console.WriteLine("\\nOrder表数据全部加载完毕!\\n");
 70 
 71         Console.WriteLine("\\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\\n");
 72 
 73         Console.Read();
 74     
 75 
 76     static void LoadUser(object obj)
 77     
 78         try
 79         
 80             Console.WriteLine("当前任务:0正在加载User部分数据!", obj);
 81         
 82         finally
 83         
 84             cde.Signal();
 85         
 86     
 87 
 88     static void LoadProduct(object obj)
 89     
 90         try
 91         
 92             Console.WriteLine("当前任务:0正在加载Product部分数据!", obj);
 93         
 94         finally
 95         
 96             cde.Signal();
 97         
 98     
 99 
100     static void LoadOrder(object obj)
101     
102         try
103         
104             Console.WriteLine("当前任务:0正在加载Order部分数据!", obj);
105         
106         finally
107         
108             cde.Signal();
109         
110     
111 
技术图片

 

技术图片


我们看到有两个主要方法:Wait和Signal。每调用一次Signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要

注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的,比如下面的

重载public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken),具体使用可以看前一篇文章的介绍。

 

二:SemaphoreSlim

     在.net 4.0之前,framework中有一个重量级的Semaphore,人家可以跨进程同步,咋轻量级不行,msdn对它的解释为:限制可同时访问

某一资源或资源池的线程数。关于它的重量级demo,我的上一个系列有演示,你也可以理解为CountdownEvent是 SemaphoreSlim的功能加

强版,好了,举一个轻量级使用的例子。

技术图片
 1 using System.Collections.Concurrent;
 2 using System.Threading.Tasks;
 3 using System;
 4 using System.Diagnostics;
 5 using System.Collections.Generic;
 6 using System.Linq;
 7 using System.Threading;
 8 
 9 class Program
10 
11     static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);
12 
13     static void Main(string[] args)
14     
15         for (int i = 0; i < 12; i++)
16         
17             Task.Factory.StartNew((obj) =>
18             
19                 Run(obj);
20             , i);
21         
22 
23         Console.Read();
24     
25 
26     static void Run(object obj)
27     
28         slim.Wait();
29 
30         Console.WriteLine("当前时间:0任务 1已经进入。", DateTime.Now, obj);
31 
32         //这里busy3s中
33         Thread.Sleep(3000);
34 
35         slim.Release();
36     
37 
技术图片

 

技术图片


同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像SemaphoreSlim这种定死的”线程请求范围“,其实是降低了扩展性,

所以说,试水有风险,使用需谨慎,在觉得有必要的时候使用它。

 

三: ManualResetEventSlim

     相信它的重量级别大家都知道是ManualReset,而这个轻量级别采用的是"自旋等待“+”内核等待“,也就是说先采用”自旋等待的方式“等待,

直到另一个任务调用set方法来释放它。如果迟迟等不到释放,那么任务就会进入基于内核的等待,所以说如果我们知道等待的时间比较短,采

用轻量级的版本会具有更好的性能,原理大概就这样,下面举个小例子。

技术图片
 1 using System.Collections.Concurrent;
 2 using System.Threading.Tasks;
 3 using System;
 4 using System.Diagnostics;
 5 using System.Collections.Generic;
 6 using System.Linq;
 7 using System.Threading;
 8 
 9 class Program
10 
11     //2047:自旋的次数
12     static ManualResetEventSlim mrs = new ManualResetEventSlim(false, 2047);
13 
14     static void Main(string[] args)
15     
16 
17         for (int i = 0; i < 12; i++)
18         
19             Task.Factory.StartNew((obj) =>
20             
21                 Run(obj);
22             , i);
23         
24 
25         Console.WriteLine("当前时间:0我是主线程1,你们这些任务都等2s执行吧:\\n",
26         DateTime.Now,
27         Thread.CurrentThread.ManagedThreadId);
28         Thread.Sleep(2000);
29 
30         mrs.Set();
31 
32         Console.Read();
33     
34 
35     static void Run(object obj)
36     
37         mrs.Wait();
38 
39         Console.WriteLine("当前时间:0任务 1已经进入。", DateTime.Now, obj);
40     
41 
技术图片

 

技术图片

以上是关于8天玩转并行开发——第五天 同步机制(下)的主要内容,如果未能解决你的问题,请参考以下文章

8天玩转并行开发——第二天 Task的使用

8天玩转并行开发——第七天 简要分析任务与线程池

8天玩转并行开发——第八天 用VS性能向导解剖你的程序

8天玩转并行开发——第六天 异步编程模型

5天玩转C#并行和多线程编程

七天玩转Redis | Day4Redis持久化机制