不同条件下的 Python 多处理队列限制
Posted
技术标签:
【中文标题】不同条件下的 Python 多处理队列限制【英文标题】:Python Multiprocessing queue limitations in different conditions 【发布时间】:2021-11-12 03:24:08 【问题描述】:import multiprocessing
import time
def WORK(x,q,it):
for i in range(it):
t = x + '---'+str(i)
q.put(t)
def cons(q,cp):
while not q.empty():
cp.append(q.get())
return q.put(cp)
if __name__ == '__main__':
cp = []
it = 600 #iteratons
start = time.perf_counter()
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target = WORK, args = ('n',q,it))
p2 = multiprocessing.Process(target=WORK, args=('x',q,it))
p3 = multiprocessing.Process(target=cons, args=(q,cp,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print(q.get())
end = time.perf_counter()
print(end - start)
我在 Pycharm 和 Colab 中运行此代码时遇到问题,如果我在 colab 中运行此代码,它只能在 1000 次迭代中正常工作,而在 WORK() 过程中则更少,如果更多 - 它会冻结。 在 Pycharm 中,只有 500 次或更少的迭代才能正常工作 有什么问题???有什么限制吗?
所以我发现不是很好的解决方案是从队列中删除 join 或将其放在 dict 调用之后,这有助于获得 mor 限制,使用此代码它开始在 pycharm 中进行 1000 次迭代,但 10000 次迭代再次陷入僵局
p1.join()
p2.join()
print(q.get())
p3.join()
end = time.perf_counter()
print(end - start)
进一步的变化帮助我通过添加 queuq maxsize 将迭代限制增加到 10000:
q = multiprocessing.Queue(maxsize = 1000)
那么这个队列有什么限制和规律??? 如何管理无休止的队列,例如从 websockets 开始,他们不断地发送数据
【问题讨论】:
阅读multiprocessing.Queue
上的文档!您不能尝试join
一个正在写入队列的进程在您已从该队列读取所有记录之前。
喜欢吗?热帮助(( print(q.get()) p1.join() p2.join() p3.join() end = time.perf_counter() print(end - start)
【参考方案1】:
您的代码有几个问题。首先,根据multiprocessing.Queue
上的文档,empty
方法不可靠。所以在函数cons
中声明while not q.empty():
是有问题的。但是即使方法Queue.empty
是可靠的,你也有一个竞态条件。您已经并行启动了进程WORK
和cons
,其中前者正在将元素写入队列,而后者正在读取,直到发现队列为空。但是如果cons
在WORK
写入它的第一个元素之前运行,它会发现队列立即为空,这不是您预期的结果。正如我在上面的评论中提到的,在检索到进程写入的所有记录之前,您不得尝试加入正在写入队列的进程。
您遇到的另一个问题是您正在向cons
传递一个空列表cp
,您将继续附加到该列表中。但是cons
是属于在不同地址空间中运行的进程的函数,因此它所附加的cp
列表不与主进程中的cp
列表相同。请注意这一点。
最后,cons
将其结果写入它正在读取的同一队列,因此主进程正在从同一队列读取此结果。所以我们有另一个竞争条件:一旦主进程被修改为不从这个队列中读取,直到它加入所有进程之后,主进程和cons
现在都从同一个队列中并行读取。我们现在需要一个单独的输入和输出队列,这样就不会发生冲突。这解决了这种竞争条件。
为了解决第一个竞争条件,WORK
进程应该编写一个特殊的sentinel 记录,作为记录结束 指示器。如果None
不是有效的正常 记录,则它可能是值None
,或者它可能是任何 不能被误认为实际记录的特殊对象。由于我们有两个进程将记录写入同一个输入队列以供cons
读取,因此我们最终将得到两条标记记录,cons
必须寻找它们才能知道确实没有更多记录了。
import multiprocessing
import time
SENTINEL = 'SENTINEL' # or None
def WORK(x, q, it):
for i in range(it):
t = x + '---' + str(i)
q.put(t)
q.put(SENTINEL) # show end of records
def cons(q_in, q_out, cp):
# We now are looking for two end of record indicators:
for record in iter(q_in.get, SENTINEL):
cp.append(record)
for record in iter(q_in.get, SENTINEL):
cp.append(record)
q_out.put(cp)
if __name__ == '__main__':
it = 600 #iteratons
start = time.perf_counter()
q_in = multiprocessing.Queue()
q_out = multiprocessing.Queue()
p1 = multiprocessing.Process(target=WORK, args = ('n', q_in, it))
p2 = multiprocessing.Process(target=WORK, args=('x', q_in, it))
cp = []
p3 = multiprocessing.Process(target=cons, args=(q_in, q_out, cp))
p1.start()
p2.start()
p3.start()
cp = q_out.get()
print(len(cp))
p1.join()
p2.join()
p3.join()
end = time.perf_counter()
print(end - start)
打印:
1200
0.1717168
【讨论】:
非常感谢,您的解决方案在这种情况下有所帮助,但我想它在无休止的请求过程中不起作用,我想使用多处理将 websockets 请求放在单独的进程中,这个请求非常连续,任务实际上是无法结束的,所以哨兵将无法使用,因为进程之间交换套接字数据的过程无穷无尽,所以我开始寻找其他模块,也许它将是共享内存,非常感谢!!!! 您可以随时发布描述新情况的新问题。以上是关于不同条件下的 Python 多处理队列限制的主要内容,如果未能解决你的问题,请参考以下文章