使用两个队列同步线程
Posted
技术标签:
【中文标题】使用两个队列同步线程【英文标题】:Synchronize threads using two queues 【发布时间】:2012-12-11 21:42:28 【问题描述】:假设我有一个线程池。这个线程池使用两个队列q1
和q2
。它从q1
读取并在q2
上写入新项目。当q1
为空时,我们交换两个队列q1, q2, = q2, q1
并重复该过程直到两者都为空。为了同步线程,我使用了另一个仅包含一个项目的队列,并在进程结束时删除了该项目。
我认为这是一种非常愚蠢的做法。有什么改进建议吗?
docs 上有一个非常好的简单示例,但只有一个队列。我的解决方案看起来不太好,如果正确的话:
global flag
global lock
global barrier
global q1
global q2
global q
while True:
if q1.empty():
flag = False
barrier.wait() # wait for all the theads to reach this point.
# execute the code of swapping queues only once
with lock:
if not flag:
flag = True
if q2.empty():
q.get()
q.task_done()
else:
q1, q2 = q2, q1
process_items_in_q1()
【问题讨论】:
您使用 2 个输入队列的任何原因?如果你有一个工作线程池,为什么不让工作人员从一个输入队列中获得工作呢?先进先出?双输入队列是必需的吗? 我正在为我们发现的每个节点实施 BFS + 一些东西。搜索的每个阶段都可以并行,但是当一个阶段完成时,我们才能进入下一个阶段。当然,除非我想念一个更好的主意。 【参考方案1】:我认为你的方法有效。以下是您可能会或可能不会发现更好的另外两个:
您可以只使用一个队列并推送 N 个标记来分隔“级别”,其中 N 是线程数。当一个线程读取一个标记时,它只调用barrier.wait()。如果目标只是确保所有 N 个线程在级别之间调用 barrier.wait(),这应该就足够了。
或者,您可以稍微简化上面发布的代码:使用 2 个普通列表而不是 2 个队列。所有线程都从list1
弹出并追加到list2
,这在Queue的逻辑中不需要特别注意。此外,如果每个“级别”都相对较大,您可以简单地为每个级别重复创建线程并等待它们全部完成的整个代码。这将使一次一层的逻辑更加明显。
(最后,像往常一样,请注意,到目前为止,在 Python 中使用线程并没有获得任何处理能力:这是 GIL。)
【讨论】:
感谢您的意见!带有标记的替代解决方案应该是等效的。我最好的猜测是带有 task_done 的队列机制抽象了障碍。不过我还没有查到。至于列表,它们实现pop()
操作的效率很低。至于 GIL,我的 process_items 是 I/O 绑定的,所以线程没问题。
抱歉不清楚:我想到了list1.pop()
,没有参数,即弹出列表的最后一个元素(这是有效的)。如果需要,您可以在列表从 list1
移动到 list2
时将其 reverse()
一次。或使用collection.deque
。以上是关于使用两个队列同步线程的主要内容,如果未能解决你的问题,请参考以下文章