“Python 线程障碍”为啥这段代码有效,有没有更好的方法?
Posted
技术标签:
【中文标题】“Python 线程障碍”为啥这段代码有效,有没有更好的方法?【英文标题】:"Python threading barrier" Why this code works and is there any better way?“Python 线程障碍”为什么这段代码有效,有没有更好的方法? 【发布时间】:2017-04-26 07:23:25 【问题描述】:我已经搜索过python屏障,但相关问题很少。我仍然对 barrier.wait() 感到困惑,即使我的代码也有效。
我利用python屏障来实现这样一个功能:一个主线程和n个子线程。在每一轮中,主线程等待所有子线程完成他们当前的工作,然后所有线程进入下一轮,直到我们满足某个条件。因此,我发现屏障适合实现此功能,这是我的主线程代码。
def superstep(self):
workers = []
barrier = threading.Barrier(self.num_workers+1)
for vertex in self.vertices:
worker = Worker(vertex, barrier)
workers.append(worker)
worker.start()
while self.flag:
barrier.wait()
self.distributeMessages()
self.count += 1
print ("superstep: ", self.count)
self.flag = self.isTerminated()
for worker in workers:
worker.flag = False
for worker in workers:
worker.join()
-
第一个“for”循环创建 n 个名为 worker 的线程并存储在一个 worker 列表中。
'while' 循环是等待其他子线程的主线程,当 self.flag 为 False 时中断。
第二个'for'循环用于在每个worker(子线程)中将flag设置为False,告诉他们退出循环。
这是我的 Worker 类。
class Worker(threading.Thread):
def __init__(self, vertex, barrier):
threading.Thread.__init__(self)
self.vertex = vertex
self.flag = True
self.barrier = barrier
def run(self):
while self.flag:
self.barrier.wait()
do something
代码运行良好,所有线程都可以join()。但是当我看到python barrier 时,所有线程都会在所有线程调用wait() 时同时释放。如果主线程从它的while循环中中断并且所有其他线程都在等待它,在这种情况下,第二个'for'循环是无用的,子线程将永远不会加入()。
那么这段代码是如何工作的,有没有其他方法可以退出屏障而不是引发 BrokenBarrierError? 另外,如果我在第二个“for”循环中添加一些代码,打印一些信息或别的东西,程序被阻止了。我猜肯定有一些子线程在wait()中并且没有机会检查flag,所以它们不能从线程的run()中退出。
【问题讨论】:
你也许可以在第二个之后调用barrier.abort()
,以释放等待的工人。
@Gribouillis 感谢您的回复。 barrier.abort() 引发了一个 BrokenBarrierError 并且它使我的代码无法执行,所以我想知道是否有更好的方法。
您可以在工作线程中捕获 BrokenBarrierError
@Gribouillis 谢谢。这是正确的方法。请原谅我之前对异常处理不熟悉。学习这部分几分钟后,问题就很清楚了。
好的,我把这个作为答案发布,所以你可以接受它
【参考方案1】:
如果您不想使用abort
,您可以在每个线程中调用两次Barrier.wait
。这会将操作分成两部分。在第一部分,工作线程将完成他们的工作,主线程将更新标志状态。然后在第二部分,每个线程都会检查标志状态并在必要时退出循环。
在代码级别它看起来像这样:
# Main
def superstep(self):
workers = []
barrier = threading.Barrier(self.num_workers+1)
for vertex in self.vertices:
worker = Worker(vertex, barrier)
workers.append(worker)
worker.start()
while self.flag:
barrier.wait()
self.distributeMessages()
self.count += 1
print ("superstep: ", self.count)
self.flag = self.isTerminated()
for worker in workers:
worker.flag = self.flag
barrier.wait()
for worker in workers:
worker.join()
# Worker
def run(self):
while self.flag:
self.barrier.wait()
# do something
self.barrier.wait()
【讨论】:
【参考方案2】:你可以打电话
self.barrier.abort()
在第二个for循环后释放等待的worker,并在worker的run()
方法中捕获BrokenBarrierError
。
【讨论】:
再次感谢,我之前接受的答案也很漂亮。以上是关于“Python 线程障碍”为啥这段代码有效,有没有更好的方法?的主要内容,如果未能解决你的问题,请参考以下文章