具有动态 maxCount 的 SemaphoreSlim

Posted

技术标签:

【中文标题】具有动态 maxCount 的 SemaphoreSlim【英文标题】:SemaphoreSlim with dynamic maxCount 【发布时间】:2014-07-26 20:38:48 【问题描述】:

我遇到了一个问题,我需要限制对另一个 Web 服务器的调用次数。它会有所不同,因为服务器是共享的,并且可能具有更多或更少的容量。

我正在考虑使用 SemaphoreSlim 类,但没有公共属性可以更改最大计数。

我应该将我的 SemaphoreSlim 类包装在另一个可以处理最大计数的类中吗?有没有更好的办法?

编辑:

这是我正在尝试的:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Semaphore

class Program

    static SemaphoreSlim _sem = new SemaphoreSlim(10,10000);

    static void Main(string[] args)
    
        int max = 15;

        for (int i = 1; i <= 50; i++)
        
            new Thread(Enter).Start(new int[]  i, max);
        

        Console.ReadLine();

        max = 11;

        for (int i = 1; i <= 50; i++)
        
            new Thread(Enter).Start(new int[]  i, max );
        
    

    static void Enter(object param)
    
        int[] arr = (int[])param;
        int id = arr[0];
        int max = arr[1];

        try
        
            Console.WriteLine(_sem.CurrentCount);

            if (_sem.CurrentCount <= max)
                _sem.Release(1);
            else
            
                _sem.Wait(1000);

                Console.WriteLine(id + " wants to enter");

                Thread.Sleep((1000 * id) / 2); // can be here at

                Console.WriteLine(id + " is in!"); // Only three threads

            
        
        catch(Exception ex)
        
            Console.WriteLine("opps ", id);
            Console.WriteLine(ex.Message);
        
        finally            
        
            _sem.Release();
        
    


问题:

1-_sem.Wait(1000) 应该取消执行超过 1000 毫秒的线程,不是吗?

2-我有使用 Release/Wait 的想法吗?

【问题讨论】:

【参考方案1】:

您无法更改最大计数,但您可以创建一个具有非常高的最大计数的SemaphoreSlim,并保留其中一些。见this constructor。

假设并发调用的绝对最大值数为 100,但最初您希望它为 25。您初始化信号量:

SemaphoreSlim sem = new SemaphoreSlim(25, 100);

所以 25 是可以同时处理的请求数。您已保留其他 75 个。

如果您想增加允许的数量,只需致电Release(num)。如果您拨打Release(10),则号码将变为35。

现在,如果你想减少可用请求的数量,你必须多次调用WaitOne。例如,如果您想从可用计数中删除 10:

for (var i = 0; i < 10; ++i)

    sem.WaitOne();

这有可能在其他客户端释放信号量之前阻塞。也就是说,如果您允许 35 个并发请求并且您想将其减少到 25 个,但已经有 35 个客户端具有活动请求,则 WaitOne 将阻塞,直到客户端调用 Release,并且循环不会终止,直到10 个客户端发布。

【讨论】:

这可能会有所帮助,但我需要一些灵活的东西。可以说,最多 1000 个并发,但几个小时后,最大值应该是 600 或 1200。我相信 SemaphoreSlim 不会给我这么灵活。 =( @ThiagoCustodio:你读过答案吗?将第二个参数设置为您将允许的最大值。然后你可以使用ReleaseWaitOne 来调整可用的数量。 你能看看我的示例代码并帮助我吗? @jim-mischel 你能想出一种方法来确保信号量槽的数量不低于给定的最小值吗?我想避免将插槽数减少到零,从而阻止我的任何任务继续进行。这样的事情是否可靠:if (sem.CurrentCount &gt; myMinNumberOfSlots) sem.Wait(); @jim-mischel 我做了一些测试,发现我们可以排除CurrentCount来确定槽数。该属性表示当前可用的槽数,其值随着每次调用ReleaseWait 而上升或下降。【参考方案2】:
    获取信号量。 将容量设置为比您需要的高一些。 将初始容量设置为您希望实际最大容量。 将信号量分发给其他人使用。

此时,您可以在信号量上等待任意时间(无需相应的发布调用)以降低容量。您可以多次释放信号量(无需相应的等待调用)以增加有效容量。

如果这是您做的足够多的事情,您可以创建自己的信号量类,该类构成SemaphoreSlim 并封装此逻辑。如果您的代码已经发布了信号量而没有先等待,那么这种组合也是必不可少的;使用您自己的课程,您可以确保此类版本是无操作的。 (也就是说,你应该避免一开始就将自己置于那个位置。)

【讨论】:

或者直接调用构造函数:msdn.microsoft.com/en-us/library/dd270891(v=vs.110).aspx 即使使用我自己的封装 SemaphoreSlim 的类,我也需要灵活地向上或向下切换最大并发调用。即从 1000 开始,更改为 600,一段时间后更改为 1700。 @JimMischel 当然,虽然如果你想真正改变正确的最大值,你真的需要将它组合成另一种类型,这样你就可以确保在它已经达到最大值时释放它没有首先增加最大值就变成了 noop(或异常)。 @ThiagoCustodio 是的,那有什么问题?您可以使用 SetMaxium 方法,根据当前最大值与所需最大值之间的差异,适当地等待或释放。 我应该更具体。我只是说,如果您在步骤 2 中使用该构造函数,则可以消除您的步骤 3。至于您的其余答案(和您的评论),我完全同意。如果没有封装,这种事情可能会非常危险。【参考方案3】:

这是我解决这种情况的方法:我创建了一个自定义信号量 slim 类,它允许我增加和减少插槽的数量。此类还允许我设置插槽的最大数量,这样我就不会超过“合理”的数量,还可以设置插槽的最小数量,这样我就不会低于“合理”的阈值。

using Picton.Messaging.Logging;
using System;
using System.Threading;

namespace Picton.Messaging.Utils

    /// <summary>
    /// An improvement over System.Threading.SemaphoreSlim that allows you to dynamically increase and
    /// decrease the number of threads that can access a resource or pool of resources concurrently.
    /// </summary>
    /// <seealso cref="System.Threading.SemaphoreSlim" />
    public class SemaphoreSlimDynamic : SemaphoreSlim
    
        #region FIELDS

        private static readonly ILog _logger = LogProvider.GetLogger(typeof(SemaphoreSlimDynamic));
        private readonly ReaderWriterLockSlim _lock;

        #endregion

        #region PROPERTIES

        /// <summary>
        /// Gets the minimum number of slots.
        /// </summary>
        /// <value>
        /// The minimum slots count.
        /// </value>
        public int MinimumSlotsCount  get; private set; 

        /// <summary>
        /// Gets the number of slots currently available.
        /// </summary>
        /// <value>
        /// The available slots count.
        /// </value>
        public int AvailableSlotsCount  get; private set; 

        /// <summary>
        /// Gets the maximum number of slots.
        /// </summary>
        /// <value>
        /// The maximum slots count.
        /// </value>
        public int MaximumSlotsCount  get; private set; 

        #endregion

        #region CONSTRUCTOR

        /// <summary>
        /// Initializes a new instance of the <see cref="SemaphoreSlimDynamic"/> class.
        /// </summary>
        /// <param name="minCount">The minimum number of slots.</param>
        /// <param name="initialCount">The initial number of slots.</param>
        /// <param name="maxCount">The maximum number of slots.</param>
        public SemaphoreSlimDynamic(int minCount, int initialCount, int maxCount)
            : base(initialCount, maxCount)
        
            _lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);

            this.MinimumSlotsCount = minCount;
            this.AvailableSlotsCount = initialCount;
            this.MaximumSlotsCount = maxCount;
        

        #endregion

        #region PUBLIC METHODS

        /// <summary>
        /// Attempts to increase the number of slots
        /// </summary>
        /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
        /// <param name="increaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryIncrease(int millisecondsTimeout = 500, int increaseCount = 1)
        
            return TryIncrease(TimeSpan.FromMilliseconds(millisecondsTimeout), increaseCount);
        

        /// <summary>
        /// Attempts to increase the number of slots
        /// </summary>
        /// <param name="timeout">The timeout.</param>
        /// <param name="increaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryIncrease(TimeSpan timeout, int increaseCount = 1)
        
            if (increaseCount < 0) throw new ArgumentOutOfRangeException(nameof(increaseCount));
            else if (increaseCount == 0) return false;

            var increased = false;

            try
            
                if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                
                    var lockAcquired = _lock.TryEnterWriteLock(timeout);
                    if (lockAcquired)
                    
                        for (int i = 0; i < increaseCount; i++)
                        
                            if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                            
                                Release();
                                this.AvailableSlotsCount++;
                                increased = true;
                            
                        

                        if (increased) _logger.Trace($"Semaphore slots increased: this.AvailableSlotsCount");

                        _lock.ExitWriteLock();
                    
                
            
            catch (SemaphoreFullException)
            
                // An exception is thrown if we attempt to exceed the max number of concurrent tasks
                // It's safe to ignore this exception
            

            return increased;
        

        /// <summary>
        /// Attempts to decrease the number of slots
        /// </summary>
        /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
        /// <param name="decreaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryDecrease(int millisecondsTimeout = 500, int decreaseCount = 1)
        
            return TryDecrease(TimeSpan.FromMilliseconds(millisecondsTimeout), decreaseCount);
        

        /// <summary>
        /// Attempts to decrease the number of slots
        /// </summary>
        /// <param name="timeout">The timeout.</param>
        /// <param name="decreaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryDecrease(TimeSpan timeout, int decreaseCount = 1)
        
            if (decreaseCount < 0) throw new ArgumentOutOfRangeException(nameof(decreaseCount));
            else if (decreaseCount == 0) return false;

            var decreased = false;

            if (this.AvailableSlotsCount > this.MinimumSlotsCount)
            
                var lockAcquired = _lock.TryEnterWriteLock(timeout);
                if (lockAcquired)
                
                    for (int i = 0; i < decreaseCount; i++)
                    
                        if (this.AvailableSlotsCount > this.MinimumSlotsCount)
                        
                            if (Wait(timeout))
                            
                                this.AvailableSlotsCount--;
                                decreased = true;
                            
                        
                    

                    if (decreased) _logger.Trace($"Semaphore slots decreased: this.AvailableSlotsCount");

                    _lock.ExitWriteLock();
                
            

            return decreased;
        

        #endregion
    

【讨论】:

【参考方案4】:

好的,我可以通过单声道项目解决我的问题。

// SemaphoreSlim.cs
//
// Copyright (c) 2008 Jérémie "Garuma" Laval
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
//

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace System.Threading

    public class SemaphoreSlimCustom : IDisposable
    
        const int spinCount = 10;
        const int deepSleepTime = 20;
        private object _sync = new object();


        int maxCount;
        int currCount;
        bool isDisposed;

        public int MaxCount
        
            get  lock (_sync)  return maxCount;  
            set
            
                lock (_sync)
                
                    maxCount = value;
                
            
        

        EventWaitHandle handle;

        public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue)
        
        

        public SemaphoreSlimCustom (int initialCount, int maxCount)
        
            if (initialCount < 0 || initialCount > maxCount || maxCount < 0)
                throw new ArgumentOutOfRangeException ("The initialCount  argument is negative, initialCount is greater than maxCount, or maxCount is not positive.");

            this.maxCount = maxCount;
            this.currCount = initialCount;
            this.handle = new ManualResetEvent (initialCount > 0);
        

        public void Dispose ()
        
            Dispose(true);
        

        protected virtual void Dispose (bool disposing)
        
            isDisposed = true;
        

        void CheckState ()
        
            if (isDisposed)
                throw new ObjectDisposedException ("The SemaphoreSlim has been disposed.");
        

        public int CurrentCount 
            get 
                return currCount;
            
        

        public int Release ()
        
            return Release(1);
        

        public int Release (int releaseCount)
        
            CheckState ();
            if (releaseCount < 1)
                throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1");

            // As we have to take care of the max limit we resort to CAS
            int oldValue, newValue;
            do 
                oldValue = currCount;
                newValue = (currCount + releaseCount);
                newValue = newValue > maxCount ? maxCount : newValue;
             while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue);

            handle.Set ();

            return oldValue;
        

        public void Wait ()
        
            Wait (CancellationToken.None);
        

        public bool Wait (TimeSpan timeout)
        
            return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None);
        

        public bool Wait (int millisecondsTimeout)
        
            return Wait (millisecondsTimeout, CancellationToken.None);
        

        public void Wait (CancellationToken cancellationToken)
        
            Wait (-1, cancellationToken);
        

        public bool Wait (TimeSpan timeout, CancellationToken cancellationToken)
        
            CheckState();
            return Wait ((int)timeout.TotalMilliseconds, cancellationToken);
        

        public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken)
        
            CheckState ();
            if (millisecondsTimeout < -1)
                throw new ArgumentOutOfRangeException ("millisecondsTimeout",
                                                       "millisecondsTimeout is a negative number other than -1");

            Stopwatch sw = Stopwatch.StartNew();

            Func<bool> stopCondition = () => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout;

            do 
                bool shouldWait;
                int result;

                do 
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;

                    shouldWait = true;
                    result = currCount;

                    if (result > 0)
                        shouldWait = false;
                    else
                        break;
                 while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result);

                if (!shouldWait) 
                    if (result == 1)
                        handle.Reset ();
                    break;
                

                SpinWait wait = new SpinWait ();

                while (Thread.VolatileRead (ref currCount) <= 0) 
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;

                    if (wait.Count > spinCount) 
                        int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds;

                        int timeout = millisecondsTimeout < 0 ? deepSleepTime :


                            Math.Min (Math.Max (diff, 1), deepSleepTime);
                        handle.WaitOne (timeout);
                     else
                        wait.SpinOnce ();
                
             while (true);

            return true;
        

        public WaitHandle AvailableWaitHandle 
            get 
                return handle;
            
        

        public Task WaitAsync ()
        
            return Task.Factory.StartNew (() => Wait ());
        

        public Task WaitAsync (CancellationToken cancellationToken)
        
            return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken);
        

        public Task<bool> WaitAsync (int millisecondsTimeout)
        
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout));
        

        public Task<bool> WaitAsync (TimeSpan timeout)
        
            return Task.Factory.StartNew (() => Wait (timeout));
        

        public Task<bool> WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken)
        
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken);
        

        public Task<bool> WaitAsync (TimeSpan timeout, CancellationToken cancellationToken)
        
            return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken);
        
    

【讨论】:

【参考方案5】:

更新 .Net Core 5 答案:

假设我想要一个最多 10 个请求的锁,但大多数时候我只想要 1 个。

private readonly static SemaphoreSlim semLock = new(1, 10);

现在当我想释放一些资源时,我可以这样做:

semLock.Release(Math.Min(9, requiredAmount));

请注意,9 比 10 小一,因为我们最初已经发布了一个版本。

一旦我想再次限制可用资源,我可以调用:

while(semLock.CurrentCount > 1)

    await semLock.WaitAsync();

等待将其恢复到 1

【讨论】:

以上是关于具有动态 maxCount 的 SemaphoreSlim的主要内容,如果未能解决你的问题,请参考以下文章

死磕 java同步系列之Semaphore源码解析

SQL Server rand() 聚合

来自 calcOpticalFlowPyrLK 的“标准”epsilon 和 maxCount 是啥

RxSwift - 使用 maxCount 重试网络请求直到成功

python MaxCounters

js助手函数