在多线程环境中满足条件(LessThan 常量)时增加变量
Posted
技术标签:
【中文标题】在多线程环境中满足条件(LessThan 常量)时增加变量【英文标题】:Increment a variable when condition (LessThan a constant) is met in multithreading environment 【发布时间】:2020-09-10 22:25:08 【问题描述】:这是上下文:
我需要一个接一个地运行一个方法 n 次(不是同时),并且 n 可以由多个线程递增。我想将此限制为 255 次(条件),所以我有下一个代码:
public class MyClass
int number = 0;
public void Caller()
Thread thread = new Thread(new ThreadStart(Request));
thread.Start();
Thread thread2 = new Thread(new ThreadStart(Request));
thread2.Start();
Thread thread3 = new Thread(new ThreadStart(Request));
thread3.Start();
public void Request()
// Condition
if (number < 255)
// Queue a new method
System.Threading.Interlocked.Increment(ref number);
// If it is the first time that Request is called, then it starts to run Method "number" times
if (number == 1)
StartRunnigMethods();
public void StartRunningMethods()
while (number > 0)
Method();
System.Threading.Interlocked.Decrement(ref number);
public void Method()
...
由于多线程的性质,如果我修改数字小于 255,我会担心请求方法。所以我实现了一个解决方案,但我不确定它是否是一个好的实现。
修改代码:
public void Request()
InterlockedIncrementIfLessThan(ref number, 255);
// It is the fisrt time Request is called
if( number == 1)
StartToRunTheMethod();
public bool InterlockedIncrementIfLessThan(ref int value, int condition)
int newValue, currentValue;
do
int initialValue = value;
currentValue = Thread.VolatileRead(ref value);
if (currentValue >= condition) return false;
newValue = initialValue + 1;
while (System.Threading.Interlocked.CompareExchange(ref value, newValue, initialValue) != initialValue);
return true;
执行比较(小于)的最佳方法是什么,如果为真,则增加变量(数字)?
我是这些主题的新手,所以您可能会向我推荐一些好的参考资料。
【问题讨论】:
什么是Interlock.Increment(ref number);
? Afaik,它只是 .NET 中的 Interlocked
类。是你自己的课吗?
抱歉,已联锁。
@CarlosPozos 所以Method
不应该并行运行?但是多个线程可以调用吗?并且它不应该运行超过 255 次?
@CarlosPozos 或方法一次不应运行超过 255 个并行实例?
@Guru Stron 不应该并行运行,也不应该运行超过 255 次
【参考方案1】:
如果您不坚持使用Interlocked
,并且您的问题不是并行运行方法并且运行次数不超过 255 次,正如您在 cmets 中所述,我建议使用 SemaphoreSlim(甚至只是简单的锁定),maxCount 设置为 1:
private static SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
private volatile int counter = 0; // to switch
public void ShouldNotRunInParallelAndMoreThan255()
if (counter >= 255)
throw new Exception("Limit reached"); // or just return
semaphore.Wait(); // or semaphore.WaitAsync() in case of async or long workloads
try
if(counter >= 255)
throw new Exception("Limit reached"); // or just return;
counter++;
// Do you stuff
finally
semaphore.Release();
【讨论】:
把“Do your Stuff”移到保护区外怎么样?长时间持有锁会产生争用。 但这就是在这种情况下锁定的重点。 “东西”不应该并行运行。如果需要很长时间,则使用带有 await 的 semaphore.WaitAsync()。 你是对的。我错过了 OP 的 “不应并行运行” 评论。【参考方案2】:通常将Volatile
和Interlocked
操作结合起来并不是一个好主意。最好使用其中一个,最好是语义更清晰的Interlocked
类。您可以通过使用Interlocked.CompareExchange
来实现Thread.VolatileRead
的功能,执行一个实际上不会改变底层值的空操作。
此外,直接访问线程之间共享的变量也不是一个好主意。 if (number < 255)
或 while (number > 0)
之类的行是危险信号(表明可能存在错误),应该避免使用。
您可以使用以下两种方法来条件更新共享变量,使用Interlocked
类:InterlockedUpdate
和InterlockedAdd
。每个方法都有两个重载,一个返回当前值,一个不返回。
public delegate bool InterlockedUpdateDelegate<T>(T existingValue, out T newValue);
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate, out int currentValue)
int value1 = Interlocked.CompareExchange(ref location1, 0, 0);
while (true)
bool updateApproved = predicate(value1, out int newValue);
if (!updateApproved) currentValue = value1; return false;
int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
if (value2 == value1) currentValue = newValue; return true;
value1 = value2;
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate)
=> InterlockedUpdate(ref location1, predicate, out _);
public static bool InterlockedAdd(ref int location1, int value,
Func<int, bool> predicate, out int currentValue)
=> InterlockedUpdate(ref location1, (int existingValue, out int newValue) =>
newValue = existingValue + value;
return predicate(existingValue);
, out currentValue);
public static bool InterlockedAdd(ref int location1, int value,
Func<int, bool> predicate)
=> InterlockedAdd(ref location1, value, predicate, out _);
使用示例:
public void Request()
bool incremented = InterlockedAdd(ref number, 1, v => v < 255,
out var currentValue);
if (incremented && currentValue == 1) StartRunningMethods();
public void StartRunningMethods()
while (true)
bool decremented = InterlockedAdd(ref number, -1, v => v > 0);
if (!decremented) break;
Method();
更新:上述InterlockedUpdate
的实现是悲观的,因为它假设观察到的变量大部分时间都是陈旧的,
所以它从请求一个新值开始。这是同一方法的乐观版本,它假设观察到的变量将是新鲜的
大多数时候。我没有测量,但我认为乐观版本在低竞争场景下应该有更好的性能。
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate, out int currentValue)
int value1 = location1; // The value1 may be stale at this point
bool isFresh = false;
while (true)
bool updateApproved = predicate(value1, out int newValue);
if (!updateApproved)
if (isFresh) currentValue = value1; return false;
newValue = value1; // Try rewritting the possibly stale value
int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
if (value2 == value1) currentValue = newValue; return updateApproved;
value1 = value2;
isFresh = true;
【讨论】:
@CarlosPozos 我更新了我的答案,添加了一个乐观的实现。以上是关于在多线程环境中满足条件(LessThan 常量)时增加变量的主要内容,如果未能解决你的问题,请参考以下文章