不同条件下的 Python 多处理队列限制

Posted

技术标签:

【中文标题】不同条件下的 Python 多处理队列限制【英文标题】:Python Multiprocessing queue limitations in different conditions 【发布时间】:2021-11-12 03:24:08 【问题描述】:
import multiprocessing
import time


def WORK(x,q,it):
        
        for i in range(it):
            t = x + '---'+str(i)
               
            q.put(t)
      
def cons(q,cp):
    while not q.empty():
         cp.append(q.get())
    return q.put(cp)

if __name__ == '__main__':
    cp = []
    it = 600 #iteratons
    start = time.perf_counter()
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target = WORK, args = ('n',q,it))
    p2 = multiprocessing.Process(target=WORK, args=('x',q,it))
    p3 = multiprocessing.Process(target=cons, args=(q,cp,))
    
    p1.start()
    p2.start()
    p3.start()


    p1.join()
    p2.join()
    p3.join()
    print(q.get())
    end = time.perf_counter()
    print(end - start)

我在 Pycharm 和 Colab 中运行此代码时遇到问题,如果我在 colab 中运行此代码,它只能在 1000 次迭代中正常工作,而在 WORK() 过程中则更少,如果更多 - 它会冻结。 在 Pycharm 中,只有 500 次或更少的迭代才能正常工作 有什么问题???有什么限制吗?

所以我发现不是很好的解决方案是从队列中删除 join 或将其放在 dict 调用之后,这有助于获得 mor 限制,使用此代码它开始在 pycharm 中进行 1000 次迭代,但 10000 次迭代再次陷入僵局

p1.join()
p2.join()
print(q.get())
p3.join()
end = time.perf_counter()
print(end - start)

进一步的变化帮助我通过添加 queuq maxsize 将迭代限制增加到 10000:

q = multiprocessing.Queue(maxsize = 1000)

那么这个队列有什么限制和规律??? 如何管理无休止的队列,例如从 websockets 开始,他们不断地发送数据

【问题讨论】:

阅读multiprocessing.Queue上的文档!您不能尝试join 一个正在写入队列的进程您已从该队列读取所有记录之前。 喜欢吗?热帮助(( print(q.get()) p1.join() p2.join() p3.join() end = time.perf_counter() print(end - start) 【参考方案1】:

您的代码有几个问题。首先,根据multiprocessing.Queue 上的文档,empty 方法可靠。所以在函数cons 中声明while not q.empty(): 是有问题的。但是即使方法Queue.empty 是可靠的,你也有一个竞态条件。您已经并行启动了进程WORKcons,其中前者正在将元素写入队列,而后者正在读取,直到发现队列为空。但是如果consWORK 写入它的第一个元素之前运行,它会发现队列立即为空,这不是您预期的结果。正如我在上面的评论中提到的,在检索到进程写入的所有记录之前,您不得尝试加入正在写入队列的进程。

您遇到的另一个问题是您正在向cons 传递一个空列表cp,您将继续附加到该列表中。但是cons 是属于在不同地址空间中运行的进程的函数,因此它所附加的cp 列表与主进程中的cp 列表相同。请注意这一点。

最后,cons 将其结果写入它正在读取的同一队列,因此主进程正在从同一队列读取此结果。所以我们有另一个竞争条件:一旦主进程被修改为不从这个队列中读取,直到它加入所有进程之后,主进程和cons现在都从同一个队列中并行读取。我们现在需要一个单独的输入和输出队列,这样就不会发生冲突。这解决了这种竞争条件。

为了解决第一个竞争条件,WORK 进程应该编写一个特殊的sentinel 记录,作为记录结束 指示器。如果None 不是有效的正常 记录,则它可能是值None,或者它可能是任何 不能被误认为实际记录的特殊对象。由于我们有两个进程将记录写入同一个输入队列以供cons 读取,因此我们最终将得到两条标记记录,cons 必须寻找它们才能知道确实没有更多记录了。

import multiprocessing
import time

SENTINEL = 'SENTINEL' # or None

def WORK(x, q, it):
        for i in range(it):
            t = x + '---' + str(i)
            q.put(t)
        q.put(SENTINEL) # show end of records

def cons(q_in, q_out, cp):
    # We now are looking for two end of record indicators:
    for record in iter(q_in.get, SENTINEL):
        cp.append(record)
    for record in iter(q_in.get, SENTINEL):
        cp.append(record)
    q_out.put(cp)

if __name__ == '__main__':
    it = 600 #iteratons
    start = time.perf_counter()
    q_in = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=WORK, args = ('n', q_in, it))
    p2 = multiprocessing.Process(target=WORK, args=('x', q_in, it))
    cp = []
    p3 = multiprocessing.Process(target=cons, args=(q_in, q_out, cp))

    p1.start()
    p2.start()
    p3.start()

    cp = q_out.get()
    print(len(cp))

    p1.join()
    p2.join()
    p3.join()
    end = time.perf_counter()
    print(end - start)

打印:

1200
0.1717168

【讨论】:

非常感谢,您的解决方案在这种情况下有所帮助,但我想它在无休止的请求过程中不起作用,我想使用多处理将 websockets 请求放在单独的进程中,这个请求非常连续,任务实际上是无法结束的,所以哨兵将无法使用,因为进程之间交换套接字数据的过程无穷无尽,所以我开始寻找其他模块,也许它将是共享内存,非常感谢!!!! 您可以随时发布描述新情况的新问题。

以上是关于不同条件下的 Python 多处理队列限制的主要内容,如果未能解决你的问题,请参考以下文章

如何将相同的队列放入不同的多处理文件?

python celery多worker多队列定时任务

Python 多处理:如何启动相互依赖的进程?

如何根据条件对优先级队列使用不同的比较器

分布式环境下的并发编程

python celery 多work多队列