如何动态锁定线程并避免竞争条件

Posted

技术标签:

【中文标题】如何动态锁定线程并避免竞争条件【英文标题】:How to lock threads dynamically and avoid race condition 【发布时间】:2018-12-24 13:56:22 【问题描述】:

我正在尝试动态锁定线程,但无论我尝试什么,总是会发生某种竞争条件,这似乎是由多个线程同时启动任务引起的,但我一直无法找到很好的答案。

这是一个例子:

using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class Example

public static readonly Object padLock = new Object();
public readonly static ConcurrentDictionary<string, object> locks = new 
ConcurrentDictionary<string, object>();
public static List<string> processes = new List<string>();

public static void Main()

  //Add random list of processes (just testing with one for now)
  for (var i = 0; i < 1; i++) 
     processes.Add("random" + i.ToString()); 
  

  while (true)
  
     foreach (var process in processes )
     
        var currentProc = process ;
        lock (padLock)
        
           if (!locks.ContainsKey(currentProc))
           
              System.Threading.Tasks.Task.Factory.StartNew(() =>
              
                 if (!locks.ContainsKey(currentProc))
                 
                    var lockObject = locks.GetOrAdd(currentProc, new object());
                    lock (lockObject)
                     
                       Console.WriteLine("Currently Executing " + currentProc); 
                       Console.WriteLine("Ended Executing " + currentProc);
                       ((IDictionary)locks).Remove(currentProc);
                    
                 
              );
           
        
       // Thread.Sleep(0);
     
  

  Console.ReadLine();
 

输出:

Started 1

Finished 1

Started 1

Finished 1

Started 1

Finished 1

但有时会得到:

Started 1

Started 1

Finished 1

这是不希望的,动态锁应该锁定它并只执行一次

【问题讨论】:

hrmm Started 没有在你的代码中表示,只是说 你想完成什么OP?如果您只想确保每个对象只处理一次,那么这是一种非常不寻常的模式。 @TheGeneral Started 当前正在执行 @JohnWu 你会怎么做? 我们不知道您要做什么。这样做的目的是什么? 【参考方案1】:

虽然我不知道您要做什么,但您看到的行为是因为您从多个线程向 Console.WriteLine 发送垃圾邮件,它们被乱序打印。我通过将时间戳附加到 Console.WriteLine 来验证这一点:

public class Example

  public static readonly Object padLock = new Object();
  public readonly static ConcurrentDictionary<string, object> locks = new ConcurrentDictionary<string, object>(); 
  public static List<string> processes = new List<string>(); 

  [ThreadStatic]
  private static bool flag = false;

 public static void Main()
 
  //Add random list of processes (just testing with one for now)
  for (var i = 0; i < 10; i++)
  
     processes.Add(i.ToString());
  

  while (true)
  
     foreach (var process in processes)
     
        var currentProc = process; 

        if (!locks.ContainsKey(currentProc))
        
           var lockObject = locks.GetOrAdd(currentProc, new object());
           Task.Factory.StartNew(() =>
            
               lock (lockObject)
               
                  if (flag) throw new Exception();
                  flag = true;
                  Console.WriteLine("Currently Executing " + currentProc);
                  Thread.Sleep(0); // You can siimulate work here
                  Console.WriteLine("Ended Executing " + currentProc);
                  flag = false;
                  ((IDictionary)locks).Remove(currentProc);
               
            );
        
     
   
  

运行如下:

....
Ended Executing 1 4025734
Currently Executing 1 4026419
Ended Executing 1 4028737
Currently Executing 1 4029565
Ended Executing 1 4030472
Currently Executing 1 4031643
Currently Executing 1 3659670
Ended Executing 1 4032900
Currently Executing 1 4033582
Ended Executing 1 4034318
Ended Executing 1 4032786
Currently Executing 1 4038042
Ended Executing 1 4042484
Currently Executing 1 4044967
Currently Executing 1 4007282
...

【讨论】:

这是一个有用的答案,但这并不准确,我按照您的说法附上了刻度线,是的,它们有问题,但是如果您复制并粘贴,您会注意到仍然存在 我在上面的代码中添加了一个标志,如果两个事件出现乱序并且没有抛出异常,它将抛出异常。 我在本地运行了您的代码,但它在@MineR 中引发了异常 这很有趣,我想知道为什么这不起作用@MineR 我将使用我实际运行的代码进行编辑,它运行良好【参考方案2】:

这是一个常见的问题

我将您的要求解读为“获取进程列表,然后使用多个线程对每个进程执行一次操作。”

就我的示例而言,假设 Foo(process) 完成了必须只执行一次的工作单元。

这是一个非常普遍的需求,有几种模式。

Parallel.ForEach

这种技术可以为循环的每次迭代使用不同的线程,这些线程将同时执行。

Parallel.ForEach(processes, process => Foo(process));

是的;就一行代码。

异步任务

如果Foo() 是异步的,这种技术是合适的。它只是为所有进程安排一个任务,然后等待它们,并让 SynchronizationContext 对其进行排序。

var tasks = processes.Select( p => Foo(process) );
await Task.WhenAll(tasks);

生产者/消费者

这使用producer-consumer pattern,这是一个线程添加到队列而另一个线程从队列中取出的传统方式。通过从队列中删除一个项目,它实际上被“锁定”了,这样其他线程就不会尝试处理它。

BlockingCollection<string>() queue = new BlockingCollection<string>();

void SetUpQueue()

    for (int i=0; i<100; i++) queue.Add(i.ToString());
    queue.CompleteAdding();


void Worker()

    while (queue.Count > 0 || !queue.IsAddingCompleted)
    
        var item = queue.Take();
        Foo(item);
    

【讨论】:

Parallel.ForEach 似乎给出了正确的结果,我只是不明白为什么即使这是在一个 while 循环中也能工作,你能详细说明一下吗? 对不起,我不明白你的问题。你遇到了哪个例子? 我的意思是,Parallel.ForEach 似乎工作,但我不明白的是为什么它不会启动同一个线程两次,即使它在一个 while 循环内 source code 非常复杂,但它似乎将列表存储为数组,创建了一系列“worker”(取决于最大并行度和 CPU 中的内核数),然后每个工作人员从数组中抓取项目并对其进行处理。如果列表中的元素过多,实际上可以重用线程,但这是意料之中的。 感谢@JohnWu,非常感谢您的意见,但我觉得 MineR 更符合我想要完成的目标,Parallel.Foreach 肯定值得研究,谢谢

以上是关于如何动态锁定线程并避免竞争条件的主要内容,如果未能解决你的问题,请参考以下文章

如何锁定文件并避免在写入时读取

我是否必须锁定Blueprint实例以避免Flask中的竞争条件?

在带有Redux的ReactJS中避免竞争条件

线程 - AtomicInteger

PHP进程/服务器崩溃时如何避免文件死锁?

竞争条件和解锁写入