使用多处理从队列中读取

Posted

技术标签:

【中文标题】使用多处理从队列中读取【英文标题】:Using multiprocessing to read from a queue 【发布时间】:2020-12-03 00:48:59 【问题描述】:

这是我使用 Python 多处理填充队列并从队列中读取的代码:

from multiprocessing import Lock, Process, Queue, Pool

import time
from random import randint

def add_to_queue(tasks_to_accomplish, name):
    while True:
        random_int = randint(0, 22)
        print('name', name , "adding" , random_int)
        tasks_to_accomplish.put(random_int)
        time.sleep(2)

def read_from_queue(tasks_to_accomplish, name):
    while True:
        item = tasks_to_accomplish.get()
        print('name' , name , item)

        time.sleep(.01)


if __name__ == '__main__':
    tasks_to_accomplish = Queue()

    p = Process(target=add_to_queue, args=(tasks_to_accomplish, "p"))
    p.start()

    p2 = Process(target=read_from_queue, args=(tasks_to_accomplish, "p2"))
    p2.start()
    p3 = Process(target=read_from_queue, args=(tasks_to_accomplish, "p3"))
    p3.start()

    p.join()
    p2.join()
    p3.join()

代码会无限执行,这里是部分输出:

name p adding 3
name p2 3
name p adding 4
name p3 4
name p adding 0
name p2 0
name p adding 22
name p3 22
name p adding 2
name p2 2
name p adding 13
name p3 13
name p adding 0
name p2 0
name p adding 14
name p3 14
name p adding 20
name p2 20
name p adding 4
name p3 4

从队列中读取所用时间为 0.01 秒:time.sleep(.01)。但是 p2 和 p3 进程似乎没有在 0.01 秒内读取线程,因为很明显它们阻塞了超过 0.01 秒。我是否正确实现了从队列中读取的进程线程?

【问题讨论】:

您每 2 秒才将一个项目添加到队列中。由于读取队列阻塞直到数据可用,读取线程将阻塞直到新数据到达。这需要 2 秒。阅读器中的sleep 可能毫无意义,因为阅读器总是阻塞get() 方法,直到有新项目可用。 【参考方案1】:

正如 Daniel 指出的那样,Queue.get() 将在默认情况下阻塞直到数据可用。

您可以使用q.get(block=True) 来更改它,尽管这会raise an exception:

name p adding 12
name p2 12
Process Process-6:
Traceback (most recent call last):
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-2-4e6d57c64980>", line 15, in read_from_queue
    item = tasks_to_accomplish.get(block=False)
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/queues.py", line 110, in get
    raise Empty
_queue.Empty
Process Process-5:
Traceback (most recent call last):
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-2-4e6d57c64980>", line 15, in read_from_queue
    item = tasks_to_accomplish.get(block=False)
  File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/queues.py", line 110, in get
    raise Empty
_queue.Empty
name p adding 2
name p adding 12
name p adding 14
name p adding 21
name p adding 9
name p adding 13

您需要:

def read_from_queue(tasks_to_accomplish, name):
    while True:
        try:
            item = tasks_to_accomplish.get(block=False)
        except:
            print('no data for', name)
        else:
            print('name' , name , item)
    
        time.sleep(.01)

得到:

name p adding 0
name p2 0
no data for p3
no data for p3
no data for p2
no data for p2
no data for p3
no data for p2
no data for p3
# about 350 more entries like this
name p adding 5
no data for p2
name p3 5
no data for p2
no data for p3
no data for p3
no data for p2
no data for p3
# ...

除非您需要在读取之间做一些工作,否则我会说是的,您已经正确实现了读取过程(并且您可以在读取时安全地删除对 sleep 的调用)。

【讨论】:

以上是关于使用多处理从队列中读取的主要内容,如果未能解决你的问题,请参考以下文章

如何从现实世界中的多个队列中读取?

处理 SQS 项目队列的多线程方法

尝试使用多处理从 s3 读取图像时遇到问题

为啥从队列中并行读取消息很慢?

java 多线程读取txt 文件

c++11 多读取器/多写入器队列使用原子用于对象状态和永久递增索引