如何在 .NET 3.5 中从 .NET 4 功能实现 Barrier 类

Posted

技术标签:

【中文标题】如何在 .NET 3.5 中从 .NET 4 功能实现 Barrier 类【英文标题】:How to implement Barrier class from .NET 4 functionality in .NET 3.5 【发布时间】:2011-10-16 22:02:57 【问题描述】:

出于某些原因,我必须坚持使用 .NET 3.5,并且我需要 .NET 4 中的 Barrier 类的功能。我有一堆线程在做一些工作,我希望它们互相等待,直到全部完成.当一切都完成后,我希望他们以类似的方式一次又一次地完成这项工作。 在Difference between Barrier in C# 4.0 and WaitHandle in C# 3.0? 线程的鼓励下,我决定使用 AutoResetEvent 和 WaitHandle 类来实现 Barrier 功能。 尽管我的代码遇到了问题:

class Program

    const int numOfThreads = 3;

    static AutoResetEvent[] barrier = new AutoResetEvent[numOfThreads];
    static Random random = new Random(System.DateTime.Now.Millisecond);

    static void barriers2(object barrierObj)
    
        AutoResetEvent[] barrierLocal = (AutoResetEvent[])barrierObj;
        string name = Thread.CurrentThread.Name;
        for (int i = 0; i < 10; i++)
        
            int sleepTime = random.Next(2000, 10000);
            System.Console.Out.WriteLine("Thread 0 at the 'barrier' will sleep for 1.", name, sleepTime);
            Thread.Sleep(sleepTime);
            System.Console.Out.WriteLine("Thread 0 at the 'barrier' with time 1.", name, sleepTime);
            int currentId = Convert.ToInt32(name);
            //for(int z = 0; z < numOfThreads; z++)
                barrierLocal[currentId].Set();
            WaitHandle.WaitAll(barrier);
            /*
            for (int k = 0; k < numOfThreads; k++)
            
                if (k == currentId)
                
                    continue;
                
                System.Console.Out.WriteLine("Thread 0 is about to wait for the singla from thread: 1", name, k);
                barrierLocal[k].WaitOne();
                System.Console.Out.WriteLine("Thread 0 is about to wait for the singla from thread: 1. done", name, k);
            
            */
        
    

    static void Main(string[] args)
    
        for (int i = 0; i < numOfThreads; i++)
        
            barrier[i] = new AutoResetEvent(false);
        
        for (int i = 0; i < numOfThreads; i++)
        
            Thread t = new Thread(Program.barriers2);
            t.Name = Convert.ToString(i);
            t.Start(barrier);
        
    

我收到的输出如下:

“屏障”处的线程 0 将休眠 7564 'barrier' 处的线程 1 将休眠 5123 'barrier' 处的线程 2 将休眠 4237 线程 2 在 'barrier' 与时间 4237 线程 1 在 'barrier' 与时间 5123 线程 0 在“障碍”处,时间为 7564 'barrier' 处的线程 0 将休眠 8641 线程 0 在 'barrier' 处,时间为 8641

就是这样。在最后一行之后没有更多的输出并且应用程序不会终止。看起来有某种僵局。但是找不到问题。欢迎任何帮助。

谢谢!

【问题讨论】:

【参考方案1】:

那是因为您使用了 AutoResetEvent。线程的 WaitAll() 调用之一将首先完成。这会自动导致所有 ARE 上的 Reset()。这会阻止其他线程完成他们的 WaitAll() 调用。

此处需要 ManualResetEvent。

【讨论】:

汉斯,谢谢你的回答。这解释了问题,但在 ManualResetEvent 类的情况下,必须有一些线程在所有线程都通过“障碍”后重置 MRE。知道如何在我的场景中完成吗? 从阅读 Barrier 源代码中获得一些灵感,比如 Reflector。我认为它使用两组 MRE(分别命名为奇数和偶数)并在它们之间交替。 你怎么看这样一个简单的解决方案:
 lock (obj)  threadsCount++; if (threadsCount == numOfThreads)  System.Console.WriteLine("所有线程都完成了。");线程数 = 0; Monitor.PulseAll(obj);  else  Monitor.Wait(obj);  
【参考方案2】:

下载 .NET 3.5 的 Reactive Extensions 反向端口。您会发现 Barrier 类以及 .NET 4.0 中发布的其他有用的并发数据结构和同步机制。

【讨论】:

嗨,感谢您的回答。是否有可能从这个 backport 获取 Barrier 类,或者我总是必须引用整个包?谢谢。 问题是,它是为了一个大项目的目的,不能强迫许多其他人在生产环境中安装扩展+。 您不必在目标机器上安装 RX 框架。事实上,如果您不想,您不必将它安装在您的开发机器上。只需抓住它附带的 System.Threading.dll 库并像引用任何其他 3rd 方库一样引用它。我就是这么做的。【参考方案3】:

这是我用于 XNA game 的实现。当我写这篇文章时,障碍不可用,我仍然坚持使用 .Net 3.5。它需要三组 ManualResetEvents,和一个计数器数组来保持相位。

using System;
using System.Threading;

namespace Colin.Threading

    /// <summary>
    /// Threading primitive for "barrier" sync, where N threads must stop at certain points 
    /// and wait for all their bretheren before continuing.
    /// </summary>
    public sealed class NThreadGate
    
        public int mNumThreads;
        private ManualResetEvent[] mEventsA;
        private ManualResetEvent[] mEventsB;
        private ManualResetEvent[] mEventsC;
        private ManualResetEvent[] mEventsBootStrap;
        private Object mLockObject;
        private int[] mCounter;
        private int mCurrentThreadIndex = 0;

        public NThreadGate(int numThreads)
        
            this.mNumThreads = numThreads;

            this.mEventsA = new ManualResetEvent[this.mNumThreads];
            this.mEventsB = new ManualResetEvent[this.mNumThreads];
            this.mEventsC = new ManualResetEvent[this.mNumThreads];
            this.mEventsBootStrap = new ManualResetEvent[this.mNumThreads];
            this.mCounter = new int[this.mNumThreads];
            this.mLockObject = new Object();

            for (int i = 0; i < this.mNumThreads; i++)
            
                this.mEventsA[i] = new ManualResetEvent(false);
                this.mEventsB[i] = new ManualResetEvent(false);
                this.mEventsC[i] = new ManualResetEvent(false);
                this.mEventsBootStrap[i] = new ManualResetEvent(false);
                this.mCounter[i] = 0;
            
        

        /// <summary>
        /// Adds a new thread to the gate system.
        /// </summary>
        /// <returns>Returns a thread ID for this thread, to be used later when waiting.</returns>
        public int AddThread()
        
            lock (this.mLockObject)
            
                this.mEventsBootStrap[this.mCurrentThreadIndex].Set();
                this.mCurrentThreadIndex++;
                return this.mCurrentThreadIndex - 1;
            
        

        /// <summary>
        /// Stop here and wait for all the other threads in the NThreadGate. When all the threads have arrived at this call, they
        /// will unblock and continue.
        /// </summary>
        /// <param name="myThreadID">The thread ID of the caller</param>
        public void WaitForOtherThreads(int myThreadID)
        
            // Make sure all the threads are ready.
            WaitHandle.WaitAll(this.mEventsBootStrap);

            // Rotate between three phases.
            int phase = this.mCounter[myThreadID];
            if (phase == 0)        // Flip
            
                this.mEventsA[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsA);
                this.mEventsC[myThreadID].Reset();
            
            else if (phase == 1)    // Flop
            
                this.mEventsB[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsB);
                this.mEventsA[myThreadID].Reset();
            
            else    // Floop
            
                this.mEventsC[myThreadID].Set();
                WaitHandle.WaitAll(this.mEventsC);
                this.mEventsB[myThreadID].Reset();
                this.mCounter[myThreadID] = 0;
                return;
            

            this.mCounter[myThreadID]++;
        
    

设置线程门:

private void SetupThreads()

    // Make an NThreadGate for N threads.
    this.mMyThreadGate = new NThreadGate(Environment.ProcessorCount);

    // Make some threads...
    // e.g. new Thread(new ThreadStart(this.DoWork);

线程工作者方法:

private void DoWork()

    int localThreadID = this.mMyThreadGate.AddThread();

    while (this.WeAreStillRunning)
    
        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);

        // Synchronized work here...

        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);

        // Synchronized work here...

        // Signal this thread as waiting at the barrier
        this.mMyThreadGate.WaitForOtherThreads(localThreadID);
    

【讨论】:

以上是关于如何在 .NET 3.5 中从 .NET 4 功能实现 Barrier 类的主要内容,如果未能解决你的问题,请参考以下文章

我可以在面向 .Net 3.5 的项目中使用所有 C# 4.0 功能吗?

针对 .NET Framework 3.5 编译的项目允许 C# 4.0 功能

如何启用.NET Framework 3.5功能 详细�0�3

您需要同时安装 .net 3.5 和 4.0 还是只需要 .net 4?

win10怎么启用net framework 3.5

如何将 .NET 4 的 XmlConvert.IsWhitespaceChar 反向移植到 .NET 3.5?