线程顺序同步

Posted

技术标签:

【中文标题】线程顺序同步【英文标题】:Thread order synchronization 【发布时间】:2012-11-01 06:54:26 【问题描述】:

假设我有一个任务队列,每个任务都有一个锁定对象(syncObject)来控制共享资源访问,队列可以有多个任务共享相同的同步对象实例。而且我有 N 个并发线程应该使任务出队并按队列顺序处理它们,这意味着按队列顺序获取同步对象的锁定。

代码说明:

abstract class Task

    public readonly Object SyncObject = new Object();


Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()

    Task task;
    lock(queueLock) task = taskQueue.Dequeue();
    //Between this lines is my problem,
    //Other thread can dequeue next task and it may share same syncObject and
    //acquire lock on it before this thread, thought this task was first in queue
    lock(task.SyncObject)
    
        //Do some work here
    

如何开始处理按队列中的顺序共享相同 SyncObject 的任务(获取 Task.SyncObject 锁)。

【问题讨论】:

我一定是误解了一些东西,但是如果你必须按顺序处理任务,那么使用多个线程将无济于事,因为只有 1 个线程会运行,而其余线程会阻塞。 您可以为它们使用不同的队列,而不是让“任务共享相同的同步对象实例”。 @L.B:很高兴听到我的想法不是一个疯狂的想法 :) 当您添加该评论时,我一定是在写它... 【参考方案1】:

听起来您的队列可能不应该包含单个任务 - 而是任务队列,其中每个子队列是“共享同步锁的所有任务”。

因此,您的处理器会:

从主队列中取出一个子队列 将第一个任务从子队列中取出并处理它 完成后将子队列放回主队列的末尾(或任何地方,实际上 - 确定您希望调度如何工作)

这将确保每个子队列一次只执行一个任务。

您可能需要从锁到子队列的映射,以便任何创建 工作都可以将其添加到正确的子队列。假设您完全需要该功能,您需要自动计算何时从地图中删除子队列(而不是将其放回主队列)。

编辑:作为对上述内容的优化,您可以将子队列本身放入您用作共享同步锁的任何内容中。它可以引用或者“下一次执行的单个任务”“任务队列”——只是懒惰地创建队列。然后,您将同步锁(实际上不再需要用作锁)放在队列中,每个消费者只会要求它执行下一个任务。如果只有一个任务可用,则返回它(并且“下一个任务”变量设置为 null)。如果有多个任务可用,则将第一个任务出列。

生产者添加新任务时,如果“第一个任务”变量之前为空,则将其设置为要执行的任务,或者如果没有队列,则创建队列但已经是一项任务,或者如果队列已经存在,则只是添加到队列中。这解决了不必要的队列创建的低效率。

同样,棘手的部分将是解决如何以原子方式丢弃共享资源锁 - 因为您只想在处理最后一项之后这样做,但同样您不想这样做错过了一项任务,因为您碰巧在错误的时间添加了它。它应该不会太糟糕,但同样你需要仔细考虑。

【讨论】:

这不合适,因为 1. 我想控制工作线程的数量,2. Tasks 共享某些资源的情况非常罕见,我会有大量的队列与 1 项和大量线程 @AlexBurtsev:我不明白 1) 是怎么回事——你仍然可以控制从主队列中拉出子队列的工作线程的数量。至于 2) - 您是否有任何证据表明引入的轻微低效率实际上会很重要?一般来说,我宁愿有一个工作但效率低下的解决方案,而不是损坏的代码......(请注意,如果您要对构成合适答案的内容添加限制,那么在问题中说明这些限制会很有帮助。 ) @AlexBurtsev:查看我的编辑以了解如何有效地执行此操作。 我猜,您对问题 1) 的看法是正确的。我同意这是一个可能的解决方案,但任务的全局顺序对我来说很重要,即使任务不共享任何锁,我也希望保留处理顺序或使其尽可能接近队列顺序。 @AlexBurtsev:同样,这是您在原始帖子中从未提及的要求 - 基本上与拥有多个消费者线程有些不一致。但是,如果任务共享相同锁的情况相对较少,则它们按顺序出列。当您为现有资源创建任务时,它最终可能会比其他情况更早或更晚地执行。【参考方案2】:

这个方法怎么样:

使用列表而不是队列 让每个工作线程按顺序循环通过队列,直到找到“解锁”任务

类似的东西(未经测试):

abstract class Task

    public readonly Object SyncObject = new Object();


List<Task> taskList = new List<Task>();

void TakeItemThreadedMethod()

    Task task = null;
    bool found = false;

    try
    
        // loop until found an task whose SyncObject is free
        while (!found)
        
            lock (taskList)
            
                for (int i = 0; i < taskList.Count; i++)
                
                    object syncObj = taskList[i].SyncObject;
                    if (found = Monitor.TryEnter(syncObj))
                    
                        for (int x = 0; x < taskList.Count; x++)
                        
                            if (Object.ReferenceEquals(
                                    syncObj, taskList[x].SyncObject))
                            
                                task = taskList[x];
                                taskList.RemoveAt(x);
                                break;
                            
                        
                        break;
                    
                
            
        

        // process the task...
        DoWork(task);
    
    finally
    
        if (found) Monitor.Exit(task.SyncObject);
    


void QueueTask(Task task)

    lock (taskList)
    
        taskList.Add(task);
    

【讨论】:

非常有趣的解决方案,乍一看应该可以,而且非常简单高效,需要一些时间来分析和测试。谢谢。 不错的尝试,但是这个算法有一个缺陷。当您的搜索循环工作时,其他线程可以在任务上获取和释放锁。假设您有一个任务队列 [1,2,3,1] Lock1 被某个工作人员占用,您的算法忽略它并且当前正在尝试 Lock2(假设 2,3 被占用),此时 Lock1 被释放,您的算法将选择最后一个带有 Lock1 的任务,它将在第一个带有 Lock1 的任务之前被处理 嗯,不错。虽然它应该是一个简单的修复。您只需要返回并找到具有相同SyncObject 的第一个任务。我更新了代码。【参考方案3】:

我用过QueuedLock class suggested by Matthew Brindley,稍作修改,我将Enter函数拆分为TakeTicket和Enter which blocks。

现在我可以在共享 QueueLock 中使用 TakeTicket 而不会阻塞整个队列。

修改代码:

abstract class Task

    public readonly QueuedLock SyncObject = new QueuedLock();


Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();

void TakeItemThreadedMethod()

    Task task;
    int ticket;
    lock(queueLock) 
    
        task = taskQueue.Dequeue();
        ticket = task.SyncObject.TakeTicket();
    
    task.SyncObject.Enter(ticket);
    //Do some work here
    task.SyncObject.Exit();

【讨论】:

基本上这就是 Jon Skeet 所建议的,我已经将队列队列集成到 LockObject 中。 是的,我认为这可行(这是我可能会建议的,但不想继续努力解决要求不断变化的问题)。不过,您可能希望将 Exit 放入 finally 块中。

以上是关于线程顺序同步的主要内容,如果未能解决你的问题,请参考以下文章

多线程如何按指定顺序同步执行

CAS无锁队列与线程同步

线程同步

Windows线程同步详解

三个线程顺序打印ABC?我有十二种做法,彻底掌握多线程同步通信机制

三个线程顺序打印ABC?我有十二种做法,彻底掌握多线程同步通信机制