可以同时运行 2 个 for 循环,一个接一个地循环吗?

Posted

技术标签:

【中文标题】可以同时运行 2 个 for 循环,一个接一个地循环吗?【英文标题】:Can 2 for loops be run simultaneously, looping one after the other? 【发布时间】:2019-08-15 07:19:18 【问题描述】:

我只是想知道,您将如何创建一个循环以使每次迭代一个接一个地发生?我知道多线程是一件事,而且我很熟悉。我无法弄清楚的一件事是如何一个接一个地运行。

例如,假设我有 2 个函数:

def loop_a():
    while True:
        time.sleep(1)
        print("a")

def loop_b():
    while True:
        print("b")

即使time.sleep(1) 出现在第一个函数中,我如何使输出为ababababababababa

我正在使用 mpi4py,并且想知道是否有任何方法可以使用这个库来做到这一点。 我的实际程序需要在函数之间发送消息。否则,使用任何其他 python 库,例如 multiprocessing 应该没问题。

有没有办法使用 threading 做到这一点?

【问题讨论】:

这听起来有点像 XY 问题。为什么要这个输出?您需要实现的实际行为是什么?请edit您的问题包含有关循环中完成的工作之间依赖关系的更多详细信息。 【参考方案1】:

你可以使用协程:

import asyncio

q = asyncio.Queue()

async def loop_a(q):
  for i in range(10):
    value = await q.get()
    print(value)

async def loop_b(q):
  for i in range(10):
    await q.put("a")
    print("b")


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(loop_a(q), loop_b(q)))

这里有live example

唯一的想法是除非您以某种方式同步它们,否则无法保证执行顺序。

【讨论】:

这只是一个例子。解释一下,这些函数可能有while 循环而不是for 循环。还有更复杂的事情(比如各个函数之间的消息传递会发生)。我认为这个解决方案不符合我的需求,所以让我换个问题 我该怎么做(包括循环) 我更新了。我会用协程来做。我没有检查问题中的通信内容。 运行此代码会打印出aaaaabbbbbbbbbbaaaaa。它不起作用【参考方案2】:

这是您问题第一部分的解决方案 - 如何并行运行进程,以便每个进程等待前一个进程完成以开始处理任务。我没有在这里解决消息传递方面的问题,因为这对我来说似乎有点模糊,并且可以根据问题陈述以不同的方式实现。在这个例子中,我们创建并运行了三个工人,它们通过简单的时间延迟来模拟执行。代码 sn-ps 应保存到可以从命令行运行的单个文件中。

我们首先导入所需的模块:

#!/usr/bin/env python3
import time

from multiprocessing import Process, Event

并实现WorkerQueue 类。这个类使工人保持正确的顺序,并负责启动和终止他们。工作人员之间的通信是使用事件来实现的。每个工人都有 other_readyready Event 字段,相应地指示先前工人和当前工人的完成状态。注意,如果队列中只有一个worker,它的other_readyready是一样的。

class WorkerQueue(object):

    def __init__(self):
        self._workers = []

    def add_worker(self, worker):

        if self._workers:
            worker.other_ready = self._workers[-1].ready
            self._workers[0].other_ready = worker.ready
        else:
            worker.other_ready = worker.ready

        self._workers.append(worker)

    def start_workers(self):

        if not self._workers:
            return

        self._workers[0].other_ready.set()

        for w in self._workers:
            w.start()

    def stop_workers(self):

        for w in self._workers:
            w.join()

然后,我们通过继承Process 类来实现worker 本身。注意,也可以使用threading 代替multiprocessing。在这种情况下,唯一改变的是 Worker 父类,Thread 而不是 Process

class Worker(Process):

    def __init__(self, delay, name=None):
        super().__init__(name=name)
        self.delay = delay
        self.other_ready = Event()
        self.other_ready.set()
        self.ready = Event()
        self.stop = Event()

    def run(self):

        while not self.stop.is_set():

            try:
                self.other_ready.wait()

                t = time.strftime('%H:%M:%S')
                print('Started:', self.name, t, flush=True)

                time.sleep(self.delay)

                t = time.strftime('%H:%M:%S')
                print('Finished:', self.name, t, flush=True)
            except:
                break

            self.other_ready.clear()
            self.ready.set()

    def join(self, timeout=None):
        self.stop.set()
        super().join(timeout)

在这里,您看到,每个工作人员在开始执行命令之前等待前一个工作人员准备好。默认情况下,设置了 other_ready,这样我们就不会在队列中有单个工作人员的情况下遇到死锁。

最后,我们实现了一个main 函数,在该函数中我们定义了worker,将它们添加到worker 队列中并启动它们。

def main():
    first = Worker(delay=1, name='first')
    second = Worker(delay=3, name='second')
    third = Worker(delay=2, name='third')

    queue = WorkerQueue()

    for w in (first, second, third):
        queue.add_worker(w)

    queue.start_workers()

    try:

        # The main infinite loop, do something useful:
        while True:
            time.sleep(1)

    except KeyboardInterrupt:
        pass
    finally:
        queue.stop_workers()

不要忘记在文件末尾添加以下行:

if __name__ == '__main__':
    main()

现在,可以将其保存到一个文件中,例如 proc_queue.py,您可以从命令行运行该文件以查看结果:

$ python3 proc_queue.py 
Started: first 16:04:09
Finished: first 16:04:10
Started: second 16:04:10
Finished: second 16:04:13
Started: third 16:04:13
Finished: third 16:04:15
Started: first 16:04:15
Finished: first 16:04:16
Started: second 16:04:16
Finished: second 16:04:19
Started: third 16:04:19
Finished: third 16:04:21
^C

这可能有点过于复杂,但这是我能想到的唯一解决方案。如果您知道更好的方法,我很乐意了解它:)

【讨论】:

【参考方案3】:

在伪代码中:

main()
1. set lock for loop1
2. start loop1 on background thread
3. start loop2 on background thread
4. wait

loop1()
1. do the following forever:
2.    acquire lock for loop1
3.    print 'a'
4.    release lock for loop2

loop2()
1. do the following forever:
2.    acquire lock for loop2
3.    print 'b'
4.    release lock for loop1

您可以将锁实现为共享内存变量或循环等待从对等方获取消息或其他任何东西。获取锁意味着阻塞或自旋锁(轮询)直到锁准备好;释放锁将适当地设置共享变量或将正确的消息发送给正确的对等方。

编辑:根据评论,这里使用许多可用的实现策略之一对 loop1() 和 loop2() 进行了更完整的开发:

(shared lock in global scope)

main()
1. lock = 1
2. start loop1 on background thread
3. start loop2 on background thread
4. wait

loop1()
1. do the following forever
2.    loop until lock = 1
3.    print 'a'
4.    lock = 2

loop2()
1. do the following forever
2.    loop until lock = 2
3.    print 'b'
4.    lock = 1

此实现使用自旋锁并依赖线程安全共享变量lock 来协调工作。自旋锁可能适合也可能不适合您的应用程序。您可以将这些与一些阻塞机制结合使用,以减少处理时间,但代价是处理过程中的一些延迟。

关键是lock 是有状态的并且(应该)只能由正确的线程获取。如果每个线程都“知道”“下一个”线程并在完成时向它发送消息,那么您可以对消息传递执行相同的操作......然后所有线程都等待收到消息。

main()
1. start loop1 on background thread
2. start loop2 on background thread
3. message loop1
4. wait

loop1()
1. do the following forever
2.    loop until message received
3.    print 'a'
4.    message loop2

loop2()
1. do the following forever
2.    loop until message received
3.    print 'b'
4.    message loop1

【讨论】:

尽管看起来很简单,但它只是不起作用。这是我想到的第一个想法,所以我先测试了它。问题是一个线程可以在第二个线程之前多次获取锁。另外,你不能保证线程执行的顺序。 @constt 不,如果你实施得当,它确实有效。您缺少“for loop1”和“for loop2”部分。 “获取”方法应该只在锁处于正确状态时获取锁。锁可以是“可用于循环1”和“可用于循环2”,并且在这些情况下只能由相应的循环获取。这解决了“多次获取”问题并保证了确定性排序(注意 Main() 程序如何将锁的状态设置为“可用于 loop1”)。那是关键。不要只有 0/1 共享计数器或“可用”广播;使其有状态。 (如果您不希望循环相互了解,很好;将锁实现为您注册的类,然后简单地释放锁以使其可用于下一个注册循环.) 好的,现在我明白了,谢谢。我的评论是关于您发布的第一个伪代码。我错误地认为您试图通过线程之间共享一个锁来解决问题。感谢您的解释!

以上是关于可以同时运行 2 个 for 循环,一个接一个地循环吗?的主要内容,如果未能解决你的问题,请参考以下文章

Spark.read() 一次多条路径,而不是在 for 循环中一个接一个

如何使用 Swift SpriteKit 在 For 循环中一一创建 SKSprite 节点

python:for循环

在for循环打字稿和nhibernate死锁中调用异步函数

同时运行此循环的最佳方法?

在 for 循环打字稿和休眠死锁中调用异步函数