“Python 线程障碍”为啥这段代码有效,有没有更好的方法?

Posted

技术标签:

【中文标题】“Python 线程障碍”为啥这段代码有效,有没有更好的方法?【英文标题】:"Python threading barrier" Why this code works and is there any better way?“Python 线程障碍”为什么这段代码有效,有没有更好的方法? 【发布时间】:2017-04-26 07:23:25 【问题描述】:

我已经搜索过python屏障,但相关问题很少。我仍然对 barrier.wait() 感到困惑,即使我的代码也有效。

我利用python屏障来实现这样一个功能:一个主线程和n个子线程。在每一轮中,主线程等待所有子线程完成他们当前的工作,然后所有线程进入下一轮,直到我们满足某个条件。因此,我发现屏障适合实现此功能,这是我的主线程代码。

 def superstep(self):
    workers = []
    barrier = threading.Barrier(self.num_workers+1)
    for vertex in self.vertices:
        worker = Worker(vertex, barrier)
        workers.append(worker)
        worker.start()

    while self.flag:
        barrier.wait()
        self.distributeMessages()
        self.count += 1
        print ("superstep: ", self.count)
        self.flag = self.isTerminated()

    for worker in workers:
        worker.flag = False

    for worker in workers:
        worker.join()
    第一个“for”循环创建 n 个名为 worker 的线程并存储在一个 worker 列表中。 'while' 循环是等待其他子线程的主线程,当 self.flag 为 False 时中断。 第二个'for'循环用于在每个worker(子线程)中将flag设置为False,告诉他们退出循环。

这是我的 Worker 类。

class Worker(threading.Thread):
    def __init__(self, vertex, barrier):
        threading.Thread.__init__(self)
        self.vertex = vertex
        self.flag = True
        self.barrier = barrier

    def run(self):
        while self.flag:
            self.barrier.wait()
            do something

代码运行良好,所有线程都可以join()。但是当我看到python barrier 时,所有线程都会在所有线程调用wait() 时同时释放。如果主线程从它的while循环中中断并且所有其他线程都在等待它,在这种情况下,第二个'for'循环是无用的,子线程将永远不会加入()。

那么这段代码是如何工作的,有没有其他方法可以退出屏障而不是引发 BrokenBarrierError? 另外,如果我在第二个“for”循环中添加一些代码,打印一些信息或别的东西,程序被阻止了。我猜肯定有一些子线程在wait()中并且没有机会检查flag,所以它们不能从线程的run()中退出。

【问题讨论】:

你也许可以在第二个之后调用barrier.abort(),以释放等待的工人。 @Gribouillis 感谢您的回复。 barrier.abort() 引发了一个 BrokenBarrierError 并且它使我的代码无法执行,所以我想知道是否有更好的方法。 您可以在工作线程中捕获 BrokenBarrierError @Gribouillis 谢谢。这是正确的方法。请原谅我之前对异常处理不熟悉。学习这部分几分钟后,问题就很清楚了。 好的,我把这个作为答案发布,所以你可以接受它 【参考方案1】:

如果您不想使用abort,您可以在每个线程中调用两次Barrier.wait。这会将操作分成两部分。在第一部分,工作线程将完成他们的工作,主线程将更新标志状态。然后在第二部分,每个线程都会检查标志状态并在必要时退出循环。

在代码级别它看起来像这样:

# Main
def superstep(self):
    workers = []
    barrier = threading.Barrier(self.num_workers+1)
    for vertex in self.vertices:
        worker = Worker(vertex, barrier)
        workers.append(worker)
        worker.start()

    while self.flag:
        barrier.wait()
        self.distributeMessages()
        self.count += 1
        print ("superstep: ", self.count)
        self.flag = self.isTerminated()
        for worker in workers:
            worker.flag = self.flag
        barrier.wait()

    for worker in workers:
        worker.join()

# Worker
def run(self):
    while self.flag:
        self.barrier.wait()
        # do something
        self.barrier.wait() 

【讨论】:

【参考方案2】:

你可以打电话

self.barrier.abort()

在第二个for循环后释放等待的worker,并在worker的run()方法中捕获BrokenBarrierError

【讨论】:

再次感谢,我之前接受的答案也很漂亮。

以上是关于“Python 线程障碍”为啥这段代码有效,有没有更好的方法?的主要内容,如果未能解决你的问题,请参考以下文章

或者是无效的 C++:为啥这段代码会编译?

这段代码没有显示任何表格视图,为啥?

为啥此应用程序委托代码有效

为啥这段代码没有收到我的广播?

为啥这段代码没有输出预期的结果?

为啥这段代码没有调用 Matlab 函数