第一个进入的线程如何向其他并发线程发出相同方法结束的信号?

Posted

技术标签:

【中文标题】第一个进入的线程如何向其他并发线程发出相同方法结束的信号?【英文标题】:How first entered thread can signal to other concurrent threads the end of same method? 【发布时间】:2011-01-14 22:27:35 【问题描述】:

我有一个名为 PollDPRAM() 的方法。它必须通过网络访问一些慢速硬件并刷新对象私有数据。如果相同的方法被其他线程同时调用,它们一定不要执行此操作,而是等待第一个到来的线程完成工作并简单地退出,因为数据是新鲜的(比如 10-30 毫秒前没有区别) . 在不先进入第二、第三等线程的方法中很容易检测到。我使用联锁计数器来检测并发性。

问题:我通过观察计数器 (Interlocked.Read) 来检测第一个线程的退出是一个错误的选择,以便在计数器减少到小于在 n>1 线程入口处检测到的值之后进行观察。这个选择很糟糕,因为第一个线程几乎可以在它离开后立即重新进入该方法。所以 n>1 个线程永远不会检测到计数器的下降。

所以问题: 如何正确检测第一个进入的线程已经退出方法,即使这个第一个线程可以立即再次进入?

谢谢

附:一段代码

        private void pollMotorsData()
    
        // Execute single poll with "foreground" handshaking 
        DateTime start = DateTime.Now;
        byte retryCount = 0;
        // Pick old data atomically to detect change
        uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
        bool changeDetected = false;
        // The design goal of DPRAM is to ease the bottleneck
        // Here is a sensor if bottleneck is actually that tight
        long parallelThreads = Interlocked.Increment(ref this.motorsPollThreadCount);
        try
        
            // For first thread entering the counter will be 1
            if (parallelThreads <= 1)
            
                do
                
                    // Handshake signal to DPRAM write process on controller side that host PC is reading
                    this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
                    try
                    
                        bool canReadMotors = false;
                        byte[] canReadFrozenDataFlag = new byte[2];
                        do
                        
                            this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
                            canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
                            if (canReadMotors) break;
                            retryCount++;
                            Thread.Sleep(1);
                         while (retryCount < 10);
                        if (!canReadMotors)
                        
                            throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
                        
                        // The lock is meaningless in contructor as it is certainly single threaded
                        // but for practice sake the access to data should always be serialized
                        lock (motorsDataLock)
                        
                            // Obtain fresh content of DPRAM
                            this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
                            this.motorsDataBorn = DateTime.Now;
                        
                    
                    finally
                    
                        // Handshake signal to DPRAM write process on controller side that host PC has finished reading
                        this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
                    
                    // Check live change in a separate atom
                    changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
                 while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));
                // Assert that result is live
                if (!changeDetected)
                
                    throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
                
            
            else
            
                // OK. Bottleneck ! The concurrent polls have collided 
                // Give the controller a breathe by waiting for other thread do the job
                // Avoid aggressive polling of stale data, which is not able to be written, locked by reader
                // Just wait for other thread do whole polling job and return with no action
                // because the data is milliseconds fresh
                do
                
                    // Amount of parallel threads must eventually decrease
                    // But no thread will leave and decrease the counter until job is done
                    if (Interlocked.Read(ref this.motorsPollThreadCount) < parallelThreads)
                    
                        // Return is possible because decreased value of concurrentThreads means that
                        // this very time other thread has finished the poll 1 millisecond ago at most
                        return;
                    
                    Thread.Sleep(1);
                    retryCount++;
                 while ((DateTime.Now - start).TotalMilliseconds < 255);
                throw new DeltaTauControllerException(this.controller, "Timeout 255ms waiting on concurrent thread to complete DPRAM polling");
            
        
        finally
        
            // Signal to other threads that work is done
            Interlocked.Decrement(ref this.motorsPollThreadCount);
            // Trace the timing and bottleneck situations
            TimeSpan duration = DateTime.Now - start;
            if (duration.TotalMilliseconds > 50 || parallelThreads > 1 || retryCount > 0)
            
                Trace.WriteLine(string.Format("Controller 0, DPRAM poll 1:0 ms, threads 2, retries 3",
                    this.controller.number,
                    duration.TotalMilliseconds,
                    parallelThreads,
                    retryCount));
            
        
    

【问题讨论】:

【参考方案1】:

同步方法并在方法内部检查最后一次完成网络访问的时间记录,以确定是否需要再次完成。

【讨论】:

我选择了尽可能多地上网,除了不必要的旅行。线程的碰撞是唯一不需要跳闸的情况。所以为了简单起见,最好避免像时间戳这样的额外数据 从逻辑上讲,有一段时间(可能以毫秒为单位),在此之后您会认为数据是陈旧的,在此之前它已经足够好了。任何进入临界区的线程都应该根据数据是否足够新鲜来进行网络检索。这就像记录每次检索的实时毫秒数一样简单(在 Java 中使用 System.currentTimeMillis 完成,在 C# 中可能非常相似)并在关键部分的开头检查它。【参考方案2】:

您可以使用“lock”关键字支持的 C# 监视器类。

基本上你的方法可以被包裹在 lock(lockobj) CallMethod()

假设所有线程都在同一个进程中,这将为您提供保护。

如果您需要跨进程锁定,则需要使用 Mutex。

至于您的程序,我会考虑将静态时间戳和缓存值放入您的方法中。所以当方法进入的时候,如果timestamp在我可以接受的范围内,就返回缓存的值,否则直接执行fetch。结合锁定机制,这应该可以满足您的需要。

当然,这是假设在 C# 监视器上占用和阻塞的时间不会影响您的应用程序的性能。

更新: 我已经更新了您的代码,以向您展示我对使用缓存和时间戳的含义。我假设您的“motorsData”变量是从电机轮询返回的东西,因此我没有它的变量。但是,如果我误解了,只需添加一个变量来存储从代码返回的数据。请注意,我没有为您进行任何错误检查,因此您需要处理您的异常。

    static DateTime lastMotorPoll;
    const TimeSpan CACHE_PERIOD = new TimeSpan(0, 0, 0, 0, 250);
    private object cachedCheckMotorsDataLock = new object();

    private void CachedCheckMotorsData()
    
        lock (cachedCheckMotorsDataLock)  //Could refactor this to perform a try enter which returns quickly if required
        
            //If the last time the data was polled is older than the cache period, poll
            if (lastMotorPoll.Add(CACHE_PERIOD) < DateTime.Now)
            
                pollMotorsData();
                lastMotorPoll = DateTime.Now;
            
            else //Data is fresh so don't poll
            
                return;
            
               
    

    private void pollMotorsData()
    
        // Execute single poll with "foreground" handshaking 
        DateTime start = DateTime.Now;
        byte retryCount = 0;
        // Pick old data atomically to detect change
        uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
        bool changeDetected = false;
        try
        
            do
            
                // Handshake signal to DPRAM write process on controller side that host PC is reading
                this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
                try
                
                    bool canReadMotors = false;
                    byte[] canReadFrozenDataFlag = new byte[2];
                    do
                    
                        this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
                        canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
                        if (canReadMotors) break;
                        retryCount++;
                        Thread.Sleep(1);
                     while (retryCount < 10);
                    if (!canReadMotors)
                    
                        throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
                    
                    // Obtain fresh content of DPRAM
                    this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
                    this.motorsDataBorn = DateTime.Now;
                
                finally
                
                    // Handshake signal to DPRAM write process on controller side that host PC has finished reading
                    this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
                
                // Check live change in a separate atom
                changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
             while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));

            // Assert that result is live
            if (!changeDetected)
            
                throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
            
        
    

【讨论】:

是的。通过临界区排序将在每次轮询时进行刷新。但临界区不会减少网络负载。设计目标是缓解慢速周边件的使用寿命。算法必须在方法内部进行分支以进行旅行与不进行旅行。 100% 同意 Spence。我只想补充一点,因为用户喜欢查看正在发生的事情(进度条已经失去了一些信任),所以最好提供回调事件通知机制来通知正在发生的事情。但这更多的是 Windows 工作流基础和 MVC 的东西。 为了在网络上留下负载,您只需在执行网络查询代码之前提供 Thread.Sleep(ms) 指令。 Nickolodeon。谢谢你。但是这个Thread.sleep是邪恶的,会导致锁升级。【参考方案3】:

有很多不同的方法可以做到这一点。正如有人已经提到的那样,您可以使用关键部分,但是如果另一个线程阻塞,这不会给您“退出”的行为。为此,您需要某种标志。您可以使用 volatile bool 并锁定对该 bool 的访问,或者您可以使用具有单个计数的信号量。最后,您可以使用互斥锁。使用同步对象的好处是您可以执行 WaitForSingleObject 并将超时设置为 0。然后您可以检查等待是否成功(如果是,则第一个线程已退出)(在这种情况下,第一个线程是仍在运行)。

【讨论】:

直接退出是不行的。因为 poll 方法在每一个真实的数据使用方法之前,它必须依赖于一定年龄的数据。说 15 毫秒或更长时间 啊,所以您不仅关心第一个线程是否正在运行,还关心数据是否存在一定的年龄?那么你肯定需要一个变量(当然是同步访问)来保存最后一次。 谢谢。带有年龄检查的简单锁对我有用。抱歉,周五晚上的构建很忙

以上是关于第一个进入的线程如何向其他并发线程发出相同方法结束的信号?的主要内容,如果未能解决你的问题,请参考以下文章

停止 MFC 线程

Java并发编程:可重入内置锁

并发

转:Java并发编程之一:可重入内置锁

如何从一个进程向多个其他进程发出信号?

队列和线程的并发请求