清空队列时遇到问题

Posted

技术标签:

【中文标题】清空队列时遇到问题【英文标题】:Trouble emptying Queue 【发布时间】:2021-07-10 02:37:08 【问题描述】:

问题

我是多处理的新手,我尝试的所有事情都无济于事。每次我想我想出一些东西时,我都会遇到一个新的障碍。我的目标是使用多个进程加载队列,然后使用多个进程从队列中拉取数据并处理数据。我已经尝试恢复到基本的队列处理,但是一旦我实现了多个进程,我就无法从队列中取出任何东西。我错过了什么?

代码

rom multiprocessing import Process, Lock
from queue import Queue
import os

q = Queue(5)


def get_from_q():
    print('trying to get')
    print(q.get())


if __name__ == '__main__':

    # put items at the end of the queue
    for x in range(6):
        print('adding ', x)
        q.put(x)

    PROCESSOR_COUNT = os.cpu_count()
    processes = []
    for p in range(PROCESSOR_COUNT):
        print('spawning process')
        p = Process(target=get_from_q)
        processes.append(p)

    for p in processes:
        print('starting')
        p.start()

    for p in processes:
        print('joining')
        p.join()

结果:

    adding 0
    adding 1
    adding 2
    adding 3
    adding 4
    adding 5

预期结果

    adding 0
    adding 1
    adding 2
    adding 3
    adding 4
    adding 5
    spawning process
    spawning process
    spawning process
    spawning processv
    starting
    starting
    starting
    starting
    trying to get 
    0
    trying to get 
    1 
    trying to get 
    2 
    trying to get 
    3 
    trying to get 
    4
    trying to get 
    5
    joining
    joining
    joining
    joining

【问题讨论】:

您是否尝试过将 queue.Queue 换成 multiprocessing.Queue? 是的,我从那个开始,然后移到这个,因为当我有 mp.Queue 时它没有向队列添加任何内容 您的队列应该是multiprocessing.Queue,只能容纳5 条记录。然而,你做的第一件事就是尝试写 6 条记录。您将在第 6 次写入时阻塞。然后您正在创建要读取的cpu_count() 进程。如果您拥有的处理器数量大于 6,则您创建的额外进程将永远等待尝试从空队列中读取。您应该为我们这些试图帮助您的凡人明确设置池大小。您可能知道您拥有的处理器数量,但我们不知道。 @Booboo 我可以用if q.full(): break 之类的东西杀死额外的进程吗? @Branden-Pincince 查看full() 的文档。它说:如果队列已满,则返回True,否则返回False。由于多线程/多处理语义,这是不可靠的。 你想要可靠 代码还是只是胡闹?如果q.full() 是可靠的呢?所以你最终只会写 5 条记录。但是第六张唱片呢?你不在乎它是否从未被写过?问问自己,“我想要完成什么?”我真的说不出来。 【参考方案1】:

如果你在使用spawn创建新进程的平台下运行,那么在创建新进程时,不是继承主进程的地址空间,而是通过重新初始化新地址空间从程序顶部执行所有代码。这意味着您在全局范围内定义的任何内容都会重新执行,例如在您的代码中:

q = Queue(5)

这意味着这段代码正在由您正在创建的每个进程执行,这意味着每个进程都有其自己的q副本。这行不通。您需要创建一次q 并将其作为参数传递。我还在 print 函数中添加了 flush=True,以减少各种进程的输出交错的机会。

from multiprocessing import Process, Lock, Queue
import os


def get_from_q(q):
    print('trying to get', q.get(), flush=True)


if __name__ == '__main__':
    PROCESSOR_COUNT = os.cpu_count()

    q = Queue(PROCESSOR_COUNT) # or put no size limitation on this

    # put items at the end of the queue
    for x in range(PROCESSOR_COUNT):
        print('adding ', x)
        q.put(x)

    processes = []
    for p in range(PROCESSOR_COUNT):
        print('spawning process')
        p = Process(target=get_from_q, args=(q,))
        processes.append(p)

    for p in processes:
        print('starting', flush=True)
        p.start()

    for p in processes:
        print('joining', flush=True)
        p.join()

打印:

adding  0
adding  1
adding  2
adding  3
adding  4
adding  5
adding  6
adding  7
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
starting
starting
starting
starting
starting
starting
starting
starting
joining
trying to get 0
trying to get 1
trying to get 2
trying to get 3
trying to get 4
trying to get 5
trying to get 6
joining
joining
joining
trying to get 7
joining
joining
joining
joining

使用进程池

这里的队列被池实现隐藏了:

from multiprocessing import Pool, cpu_count


def worker(x):
    print('x =', x, flush=True)
    return x ** 2


if __name__ == '__main__':
    PROCESSOR_COUNT = cpu_count()

    pool = Pool(PROCESSOR_COUNT) #
    print(pool.map(worker, range(PROCESSOR_COUNT)))

打印:

x = 0
x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
[0, 1, 4, 9, 16, 25, 36, 49]

【讨论】:

以上是关于清空队列时遇到问题的主要内容,如果未能解决你的问题,请参考以下文章

20210826每日总结

C++ 我在设置我创建的类的优先级队列时遇到问题

在我的二进制堆/优先级队列实现中实现删除时遇到问题

打开队列管理器名称时遇到错误 = .... 原因代码 = 2354

学习js,遇到坑爹的combobox的text值的清空问题

Java反射机制清空字符串导致业务异常分析