线程顺序同步
Posted
技术标签:
【中文标题】线程顺序同步【英文标题】:Thread order synchronization 【发布时间】:2012-11-01 06:54:26 【问题描述】:假设我有一个任务队列,每个任务都有一个锁定对象(syncObject)来控制共享资源访问,队列可以有多个任务共享相同的同步对象实例。而且我有 N 个并发线程应该使任务出队并按队列顺序处理它们,这意味着按队列顺序获取同步对象的锁定。
代码说明:
abstract class Task
public readonly Object SyncObject = new Object();
Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();
void TakeItemThreadedMethod()
Task task;
lock(queueLock) task = taskQueue.Dequeue();
//Between this lines is my problem,
//Other thread can dequeue next task and it may share same syncObject and
//acquire lock on it before this thread, thought this task was first in queue
lock(task.SyncObject)
//Do some work here
如何开始处理按队列中的顺序共享相同 SyncObject 的任务(获取 Task.SyncObject 锁)。
【问题讨论】:
我一定是误解了一些东西,但是如果你必须按顺序处理任务,那么使用多个线程将无济于事,因为只有 1 个线程会运行,而其余线程会阻塞。 您可以为它们使用不同的队列,而不是让“任务共享相同的同步对象实例”。 @L.B:很高兴听到我的想法不是一个疯狂的想法 :) 当您添加该评论时,我一定是在写它... 【参考方案1】:听起来您的队列可能不应该包含单个任务 - 而是任务队列,其中每个子队列是“共享同步锁的所有任务”。
因此,您的处理器会:
从主队列中取出一个子队列 将第一个任务从子队列中取出并处理它 完成后将子队列放回主队列的末尾(或任何地方,实际上 - 确定您希望调度如何工作)这将确保每个子队列一次只执行一个任务。
您可能需要从锁到子队列的映射,以便任何创建 工作都可以将其添加到正确的子队列。假设您完全需要该功能,您需要自动计算何时从地图中删除子队列(而不是将其放回主队列)。
编辑:作为对上述内容的优化,您可以将子队列本身放入您用作共享同步锁的任何内容中。它可以引用或者“下一次执行的单个任务”或“任务队列”——只是懒惰地创建队列。然后,您将同步锁(实际上不再需要用作锁)放在队列中,每个消费者只会要求它执行下一个任务。如果只有一个任务可用,则返回它(并且“下一个任务”变量设置为 null)。如果有多个任务可用,则将第一个任务出列。
当生产者添加新任务时,如果“第一个任务”变量之前为空,则将其设置为要执行的任务,或者如果没有队列,则创建队列但已经是一项任务,或者如果队列已经存在,则只是添加到队列中。这解决了不必要的队列创建的低效率。
同样,棘手的部分将是解决如何以原子方式丢弃共享资源锁 - 因为您只想在处理最后一项之后这样做,但同样您不想这样做错过了一项任务,因为您碰巧在错误的时间添加了它。它应该不会太糟糕,但同样你需要仔细考虑。
【讨论】:
这不合适,因为 1. 我想控制工作线程的数量,2. Tasks 共享某些资源的情况非常罕见,我会有大量的队列与 1 项和大量线程 @AlexBurtsev:我不明白 1) 是怎么回事——你仍然可以控制从主队列中拉出子队列的工作线程的数量。至于 2) - 您是否有任何证据表明引入的轻微低效率实际上会很重要?一般来说,我宁愿有一个工作但效率低下的解决方案,而不是损坏的代码......(请注意,如果您要对构成合适答案的内容添加限制,那么在问题中说明这些限制会很有帮助。 ) @AlexBurtsev:查看我的编辑以了解如何有效地执行此操作。 我猜,您对问题 1) 的看法是正确的。我同意这是一个可能的解决方案,但任务的全局顺序对我来说很重要,即使任务不共享任何锁,我也希望保留处理顺序或使其尽可能接近队列顺序。 @AlexBurtsev:同样,这是您在原始帖子中从未提及的要求 - 基本上与拥有多个消费者线程有些不一致。但是,如果任务共享相同锁的情况相对较少,则它们将按顺序出列。当您为现有资源创建任务时,它最终可能会比其他情况更早或更晚地执行。【参考方案2】:这个方法怎么样:
使用列表而不是队列 让每个工作线程按顺序循环通过队列,直到找到“解锁”任务类似的东西(未经测试):
abstract class Task
public readonly Object SyncObject = new Object();
List<Task> taskList = new List<Task>();
void TakeItemThreadedMethod()
Task task = null;
bool found = false;
try
// loop until found an task whose SyncObject is free
while (!found)
lock (taskList)
for (int i = 0; i < taskList.Count; i++)
object syncObj = taskList[i].SyncObject;
if (found = Monitor.TryEnter(syncObj))
for (int x = 0; x < taskList.Count; x++)
if (Object.ReferenceEquals(
syncObj, taskList[x].SyncObject))
task = taskList[x];
taskList.RemoveAt(x);
break;
break;
// process the task...
DoWork(task);
finally
if (found) Monitor.Exit(task.SyncObject);
void QueueTask(Task task)
lock (taskList)
taskList.Add(task);
【讨论】:
非常有趣的解决方案,乍一看应该可以,而且非常简单高效,需要一些时间来分析和测试。谢谢。 不错的尝试,但是这个算法有一个缺陷。当您的搜索循环工作时,其他线程可以在任务上获取和释放锁。假设您有一个任务队列 [1,2,3,1] Lock1 被某个工作人员占用,您的算法忽略它并且当前正在尝试 Lock2(假设 2,3 被占用),此时 Lock1 被释放,您的算法将选择最后一个带有 Lock1 的任务,它将在第一个带有 Lock1 的任务之前被处理 嗯,不错。虽然它应该是一个简单的修复。您只需要返回并找到具有相同SyncObject
的第一个任务。我更新了代码。【参考方案3】:
我用过QueuedLock class suggested by Matthew Brindley,稍作修改,我将Enter函数拆分为TakeTicket和Enter which blocks。
现在我可以在共享 QueueLock 中使用 TakeTicket 而不会阻塞整个队列。
修改代码:
abstract class Task
public readonly QueuedLock SyncObject = new QueuedLock();
Queue<Task> taskQueue = new Queue<Task>();
Object queueLock = new Object();
void TakeItemThreadedMethod()
Task task;
int ticket;
lock(queueLock)
task = taskQueue.Dequeue();
ticket = task.SyncObject.TakeTicket();
task.SyncObject.Enter(ticket);
//Do some work here
task.SyncObject.Exit();
【讨论】:
基本上这就是 Jon Skeet 所建议的,我已经将队列队列集成到 LockObject 中。 是的,我认为这可行(这是我可能会建议的,但不想继续努力解决要求不断变化的问题)。不过,您可能希望将Exit
放入 finally
块中。以上是关于线程顺序同步的主要内容,如果未能解决你的问题,请参考以下文章