C#的并发机制优秀在哪?

Posted beyondma

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#的并发机制优秀在哪?相关的知识,希望对你有一定的参考价值。

笔者上次用C#写.Net代码差不多还是10多年以前,由于当时Java已经颇具王者风范,Net几乎被打得溃不成军。因此当时笔者对于这个.Net的项目态度比较敷衍了事,没有对其中一些优秀机制有很深的了解,在去年写《C和Java没那么香了,高并发时代谁能称王》时都没给.Net以一席之地,不过最近恰好机缘巧合,我又接手了一个Windows方面的项目,这也让我有机会重新审视一下自己关于.Net框架的相关知识。

项目原型要实现的功能并不复杂,主要就是记录移动存储设备中文件拷出的记录,而且需要尽可能少的占用系统资源,而在开发过程中我无意中加了一行看似没有任何效果的代码,使用Invoke方法记录文件拷出情况,这样的操作却让程序执行效率明显会更高,这背后的原因特别值得总结。

一行没用的代码却提高了效率?

由于我需要记录的文件拷出信息并没有回显在UI的需要,因此也就没考虑并发冲突的问题,在最初版本的实现中,我对于filesystemwatcher的回调事件,都是直接处理的,如下:

private void DeleteFileHandler(object sender, FileSystemEventArgs e)

        

            if(files.Contains(e.FullPath))

            

                files.Remove(e.FullPath);

               //一些其它操作
  
            

        

这个程序的处理效率在普通的办公PC上如果同时拷出20个文件,那么在拷贝过程中,U盘监测程序的CPU使用率大约是0.7%。

但是一个非常偶然的机会,我使用了Event/Delegate的Invoke机制,结果发现这样一个看似的废操作,却让程序的CPU占用率下降到0.2%左右

 private void UdiskWather_Deleted(object sender, FileSystemEventArgs e)

        

            if(this.InvokeRequired)

            

                this.Invoke(new DeleteDelegate(DeleteFileHandler), new object[]  sender,e );               

            else

            

                DeleteFileHandler(sender, e);

            

        

在我最初的认识中.net中的Delegate机制在调用过程中是要进行拆、装箱操作的,因此这不拖慢操作就不错了,但实际的验证结果却相反。

 看似没用的Invoke到底有什么用

这里先给出结论,Invoke能提升程序执行效率,其关键还是在于线程在多核之间切换的消耗要远远高于拆、装箱的资源消耗,我们知道我们程序的核心就是操作files这个共享变量,每次在被检测的U盘目录中如果发生文件变动,其回调通知函数可能都运行在不同的线程,如下:

Invoke机制的背后其实就是保证所有对于files这个共享变量的操作,全部都是由一个线程执行完成的。

目前.Net的代码都开源的,下面我们大致讲解一下Invoke的调用过程,不管是BeginInvoke还是Invoke背后其实都是调用的MarshaledInvoke方法来完成的,如下:

​
public IAsyncResult BeginInvoke(Delegate method, params Object[] args) 

            using (new MultithreadSafeCallScope()) 

                Control marshaler = FindMarshalingControl();

                return(IAsyncResult)marshaler.MarshaledInvoke(this, method, args, false);

            

        

​

MarshaledInvoke的主要工作是创建ThreadMethodEntry对象并把它放在一个链表里进行管理然后调用PostMessage将相关信息发给要通信的线程如下:

​
private Object MarshaledInvoke(Control caller, Delegate method, Object[] args, bool synchronous) 

     

            if (!IsHandleCreated) 

                throw new InvalidOperationException(SR.GetString(SR.ErrorNoMarshalingThread));

            

            

            ActiveXImpl activeXImpl = (ActiveXImpl)Properties.GetObject(PropActiveXImpl);

            if (activeXImpl != null) 

                IntSecurity.UnmanagedCode.Demand();

            

            // We don't want to wait if we're on the same thread, or else we'll deadlock.

            // It is important that syncSameThread always be false for asynchronous calls.

            //

            bool syncSameThread = false;

            int pid; // ignored

            if (SafeNativeMethods.GetWindowThreadProcessId(new HandleRef(this, Handle), out pid) == SafeNativeMethods.GetCurrentThreadId()) 

                if (synchronous)

                    syncSameThread = true;

            

            // Store the compressed stack information from the thread that is calling the Invoke()

            // so we can assign the same security context to the thread that will actually execute

            // the delegate being passed.

            //

            ExecutionContext executionContext = null;

            if (!syncSameThread) 

                executionContext = ExecutionContext.Capture();

            

            ThreadMethodEntry tme = new ThreadMethodEntry(caller, this, method, args, synchronous, executionContext);

            lock (this) 

                if (threadCallbackList == null) 

                    threadCallbackList = new Queue();

                

            

            lock (threadCallbackList) 

                if (threadCallbackMessage == 0) 

                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");

                

                threadCallbackList.Enqueue(tme);

            

            if (syncSameThread) 

                InvokeMarshaledCallbacks();

              else 

                //

                UnsafeNativeMethods.PostMessage(new HandleRef(this, Handle), threadCallbackMessage, IntPtr.Zero, IntPtr.Zero);

            

            if (synchronous) 

                if (!tme.IsCompleted) 

                    WaitForWaitHandle(tme.AsyncWaitHandle);

                

                if (tme.exception != null) 

                    throw tme.exception;

                

                return tme.retVal;

            

            else 

                return(IAsyncResult)tme;

            

        

​

Invoke的机制就保证了一个共享变量只能由一个线程维护,这和GO语言使用通信来替代共享内存的设计是暗合的,他们的理念都是 "让同一块内存在同一时间内只被一个线程操作" 。这和现代计算体系结构的多核CPU(SMP)有着密不可分的联系,

这里我们先来科普一下CPU之间的通信MESI协议的内容。我们知道现代的CPU都配备了高速缓存,按照多核高速缓存同步的MESI协议约定,每个缓存行都有四个状态,分别是E(exclusive)、M(modified)、S(shared)、I(invalid),其中:

M:代表该缓存行中的内容被修改,并且该缓存行只被缓存在该CPU中。这个状态代表缓存行的数据和内存中的数据不同。

E:代表该缓存行对应内存中的内容只被该CPU缓存,其他CPU没有缓存该缓存对应内存行中的内容。这个状态的缓存行中的数据与内存的数据一致。

I:代表该缓存行中的内容无效。

S:该状态意味着数据不止存在本地CPU缓存中,还存在其它CPU的缓存中。这个状态的数据和内存中的数据也是一致的。不过只要有CPU修改该缓存行都会使该行状态变成 I 。

四种状态的状态转移图如下:

​我们上文也提到了,不同的线程是有大概率是运行在不同CPU核上的,在不同CPU操作同一块内存时,站在CPU0的角度上看,就是CPU1会不断发起remote write的操作,这会使该高速缓存的状态总是会在S和I之间进行状态迁移,而一旦状态变为I将耗费比较多的时间进行状态同步。

因此我们可以基本得出 this.Invoke(new DeleteDelegate(DeleteFileHandler), new object[] sender,e );   ;这行看似无关紧要的代码之后,无意中使files共享变量的维护操作,由多核多线程共同操作,变成了众多子线程向主线程通信,所有维护操作均由主线程进行,这也使最终的执行效率有所提高。

​深度解读,为何要加两把锁

在当前使用通信替代共享内存的大潮之下,锁其实是最重要的设计。

我们看到在.Net的Invoke实现中,使用了两把锁lock (this) lock (threadCallbackList)

lock (this) 

                if (threadCallbackList == null) 

                    threadCallbackList = new Queue();

                

            



            lock (threadCallbackList) 

                if (threadCallbackMessage == 0) 

                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");

                

                threadCallbackList.Enqueue(tme);

            

在.NET当中lock关键字的基本可以理解为提供了一个近似于CAS的锁(Compare And Swap)。CAS的原理不断地把"期望值"和"实际值"进行比较,当它们相等时,说明持有锁的CPU已经释放了该锁,那么试图获取这把锁的CPU就会尝试将"new"的值(0)写入"p"(交换),以表明自己成为spinlock新的owner。伪代码演示如下:

void CAS(int p, int old,int new)

    if *p != old
        do nothing
    else 
     *p ← new

基于CAS的锁效率没问题,尤其是在没有多核竞争的情况CAS表现得尤其优秀,但CAS最大的问题就是不公平,因为如果有多个CPU同时在申请一把锁,那么刚刚释放锁的CPU极可能在下一轮的竞争中获取优势,再次获得这把锁,这样的结果就是一个CPU忙死,而其它CPU却很闲,我们很多时候诟病多核SOC“一核有难,八核围观”其实很多时候都是由这种不公平造成的。

为了解决CAS的不公平问题,业界大神们又引入了TAS(Test And Set Lock)机制,个人感觉还是把TAS中的T理解为Ticket更好记一些,TAS方案中维护了一个请求该锁的头尾索引值,由"head"和"tail"两个索引组成。

struct lockStruct
    int32 head;
    int32 tail;
 ;

"head"代表请求队列的头部,"tail"代表请求队列的尾部,其初始值都为0。

最一开始时,第一个申请的CPU发现该队列的tail值是0,那么这个CPU会直接获取这把锁,并会把tail值更新为1,并在释放该锁时将head值更新为1。

在一般情况下当锁被持有的CPU释放时,该队列的head值会被加1,当其他CPU在试图获取这个锁时,锁的tail值获取到,然后把这个tail值加1,并存储在自己专属的寄存器当中,然后再把更新后的tail值更新到队列的tail当中。接下来就是不断地循环比较,判断该锁当前的"head"值,是否和自己存储在寄存器中的"tail"值相等,相等时则代表成功获得该锁。

TAS这类似于用户到政务大厅去办事时,首先要在叫号机取号,当工作人员广播叫到的号码与你手中的号码一致时,你就获取了办事柜台的所有权。

但是TAS却存在一定的效率问题,根据我们上文介绍的MESI协议,这个lock的头尾索引其实是在各个CPU之间共享的,因此tail和head频繁更新,还是会引发调整缓存不停的invalidate,这会极大的影响效率。

因此我们看到在.Net的实现中干脆就直接引入了threadCallbackList的队列,并不断将tme(ThreadMethodEntry)加入队尾,而接收消息的进程,则不断从队首获取消息.

lock (threadCallbackList) 

                if (threadCallbackMessage == 0) 

                    threadCallbackMessage = SafeNativeMethods.RegisterWindowMessage(Application.WindowMessagesVersion + "_ThreadCallbackMessage");

                

                threadCallbackList.Enqueue(tme);

            

当队首指向这个tme时,消息才被发送,其实是一种类似于MAS的实现,当然MAS实际是为每个CPU都建立了一个专属的队列,和Invoke的设计略有不同,不过基本的思想是一致的。

很多时候年少时不是品不出很多东西背后味道的,这也让我错过了很多非常值得总结的技术要点,因此在春节假期总结一下最近使用C#的心得,以飨读者,顺祝大家新春愉快!

以上是关于C#的并发机制优秀在哪?的主要内容,如果未能解决你的问题,请参考以下文章

C#的并发机制优秀在哪?

Excel 两个日期之间间隔的工作日数,非头尾,大家帮我看看我这样做,结果不对呀?问题出在哪了?不甚感激!

#yyds干货盘点#dart系列之:dart优秀的秘诀-隔离机制

C#正则表达式保留头尾替换中间为*

Java并发编程实战—– AQS:CLH同步队列

有关Java的优秀博客集锦