我应该将池或进程与队列一起使用吗?
Posted
技术标签:
【中文标题】我应该将池或进程与队列一起使用吗?【英文标题】:Should I use Pool or Process with a Queue? 【发布时间】:2021-05-03 19:56:16 【问题描述】:我有一个函数 (A
) 以恒定速率创建数据,比如说每秒 100 个。我想在 A
创建的数据上运行另一个函数 (B
)。函数B
的运行时间可能比0.01s
长,但我不希望它备份数据流。我是否应该创建一个Pool
的B
并将一个通用的Queue
传递给A
和B
以使用(如下面的代码)?我还看到您应该使用Pool
s 来处理数据列表。这是他们应该如何使用(关于我描述的方法)?我应该只使用两个Process
s 并交替向它们发送数据吗?
def A(queue):
while True:
data = data_getter()
queue.put(data)
def B(queue):
while True:
data = queue.get(True):
do_something(data)
# main.py
q = Queue()
pool = Pool(initializer=B, initargs=[q])
A(q)
【问题讨论】:
【参考方案1】:这是我的简短回答:
进程池存在的目的是允许您以并行方式处理 N 个“作业”,以尽可能充分地处理,因为您已为该任务分配了 M 个物理处理器。 p>
创建一个队列,一个Process
实例正在写入N次(相当于提交N个“作业”)并让M个Process
实例读取和处理这些消息,即“作业”,并处理它们,实际上是一种进程池的实现。使用单独的进程池来创建队列的读取进程所需的进程似乎是不必要的复杂层。因此,我将创建 M 个 Process
实例,这些实例从编写器进程向其添加消息的公共队列中读取。
TL;DR(或长答案)
正如您正确推测的那样,您可以通过 (1) 创建单独的 Process
实例或 (2) 使用进程池来实现。方法 1 从直觉上看似乎是最合乎逻辑的方法,但它不一定是最直接的代码。我在下面使用模拟提供了一些方法,其中队列写入器进程每 0.01 秒创建一次队列条目,但队列读取器进程需要 0.06 秒来处理队列条目,因此至少有 6 个这样的进程(从一个共同的队列)需要跟上:
方法 1 -- 显式进程
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
# signal readers to terminate:
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(queue):
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,)) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers))
writer.start()
# wait for writer to terminate:
writer.join()
for p in readers:
p.join()
print('Done')
if __name__ == '__main__':
main()
方法 2 - 使用进程池
import multiprocessing as mp
import time
class Sentinel():
pass
def init_pool(q):
global queue
queue = q
def a(n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b():
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers + 1, initializer=init_pool, initargs=(queue,))
readers_results = [pool.apply_async(b) for _ in range(n_readers)]
# now submit writer:
pool.apply(a, args=(n_readers,))
# wait for readers to finish:
for r in readers_results:
r.get()
print('Done')
if __name__ == '__main__':
main()
第二种方法的唯一优点是,如果工作人员a
和/或b
需要将值返回到主进程,则使用进程池时会变得很简单。
注意
通过使用Pool
构造函数的initializer
参数来实现您的队列读取器进程,函数B
也是可行的(参见下面的方法池2A),但是函数A
必须在主进程下运行.但是这些 Pool 进程是守护进程,并且会在所有非守护进程终止后立即终止。这就是为什么我在方法 2 中安排将特殊标记消息写入队列作为“作业”(但不是运行作业的进程)在读取标记消息时终止的信号。因此,我知道当作业完成后,队列中将不再有消息,并且队列中将不再有任何消息。类似的逻辑适用于方法 1,除了整个过程也终止,我可以使用join
知道何时发生。但是在您的情况下,使用隐式守护线程来执行队列的读取,即使您在读取所有输入队列值并且初始化函数 B
终止时添加附加代码以将标记值添加到队列中,主进程怎么知道?同样,您可以在池上调用方法Pool.join()
,这可以防止任何未来的工作被提交到池(我们实际上从未明确提交工作;所有工作都在池初始化函数中完成)。然后你调用Pool.join()
,等待每个工作进程退出。这将在每个流程实例的池初始化函数完成后立即发生,因为之前对 Pool.close
的调用告诉池永远不会有任何额外的工作添加到池中。
方法 2A - 使用带有池初始化程序的进程池
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(the_queue):
global queue
queue = the_queue
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
pool.join()
print('Done')
if __name__ == '__main__':
main()
备注
所有三种方法都将起作用,并且所有三种方法都假定读取器进程不会无限期运行,因此我们对有序终止感兴趣(因此需要哨兵值向读取器进程发出终止信号)。但是如果编写器进程被设计为无限期地运行直到进程被用户中断,那么例如方法2a可以修改为使用用户输入ctrl-C产生的键盘中断来终止执行:
修改方法 2A 仅由键盘中断终止
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
try:
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
except KeyboardInterrupt:
pass
def b(the_queue):
global queue
queue = the_queue
try:
while True:
value = queue.get(True)
print(value, end=' ', flush=True)
time.sleep(.06)
except KeyboardInterrupt:
pass
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
try:
pool.join()
except KeyboardInterrupt:
pool.terminate()
print('Done')
if __name__ == '__main__':
main()
修改方法1仅通过键盘输入终止
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
def b(queue):
while True:
value = queue.get(True)
if value % 100 == 0:
print(value, end=' ', flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,), daemon=True) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers), daemon=True)
writer.start()
input('Enter return to terminate...')
print()
print('Done')
if __name__ == '__main__':
main()
结论
你显然有选择。如果程序不是无限期运行,并且您希望有序关闭以确保已处理所有已排队的消息,那么我的首选是方法 1。方法 2 和 2a 似乎只是让 N 个进程执行相同的工作,为您提供相同的论据。
另一方面,如果您的编写器进程任务无休止地运行并且您需要终止它并且不介意队列中可能留下一两条未处理的消息(毕竟您正在终止程序任意时间点,所以这应该没什么大不了的),那么如果一个简单的input
语句就足以输入终止命令,那么修改方法 1 似乎是需要最少修改的方法。但是如果正在运行的程序不断地输出消息,input
语句显示的文本就会丢失,你需要依赖对每个进程使用键盘中断处理程序,这涉及更多。如果有任何修改示例,您可以使用此技术;我在修改后的方法 2a 中使用了它作为示例,因为该代码不适合使用 input
语句技术,因为终端输出太多了。毫无疑问,当有 any 终端输出时,最可靠的方法是使用键盘处理程序中断处理程序方法。只要不需要从任何进程取回返回值,我仍然倾向于使用方法 1 及其变体而不是进程池:
【讨论】:
非常感谢!这是一个非常详细的答案!以上是关于我应该将池或进程与队列一起使用吗?的主要内容,如果未能解决你的问题,请参考以下文章
尝试将优先级队列与这个泛型类一起使用时,我应该使用 Comparator 还是 Comparable ?