如何结合python多处理和管道技术?
Posted
技术标签:
【中文标题】如何结合python多处理和管道技术?【英文标题】:How to combine python multiprocessing and pipeline technique? 【发布时间】:2019-04-15 19:04:54 【问题描述】:你好,我是新来的,我想问一些问题。现在我正在使用 python 多处理来处理队列中的数据。示例我有 3 个函数来计算队列中的数据,在队列中我有 3 个数据。是否可以将管道技术与多处理一起使用以使其更快?
在这段代码中,我尝试使用多处理队列在多处理进程之间进行通信,并使用锁定来防止其他进程在队列中的数据从前一个函数完成之前使用它。但它
from multiprocessing import Process, current_process, cpu_count, Queue, Pool, Lock, Array
from threading import Thread, current_thread
import time
import os
def a(pid, q1, q2, lock):
while not q1.empty():
data = q1.get()
print("data from q1 is %s" % data)
# for i in range(1000000):
new_data = data*2
lock.acquire()
q2.put(new_data)
print(pid)
lock.release()
def b(pid, q2, q3, lock):
while not q2.empty():
data = q2.get()
print("data from q2 is %s" % data)
# for i in range(1000000):
lock.acquire()
new_data = data*3
q3.put(new_data)
print(pid)
lock.release()
def c(pid, q3, q4, lock):
while not q3.empty():
data = q3.get()
print("data from q3 is %s" % data)
# for i in range(1000000):
lock.acquire()
new_data = data*4
q4.put(new_data)
print(pid)
lock.release()
if __name__ == "__main__":
number = [1,2,3]
lock = Lock()
q1 = Queue()
q2 = Queue()
q3 = Queue()
q4 = Queue()
for data in number:
q1.put(data)
p1 = Process(target=a,args=(1, q1, q2, lock))
p2 = Process(target=b,args=(2, q2, q3, lock))
p3 = Process(target=c,args=(3, q3, q4, lock))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
for i in range(q4.qsize()):
print(q4.get())
我希望管道序列应该像这样执行 f1 | f1 f2 | f1 f2 f3 | f2 f3 |如果我的信息正确,f3 和队列中的解决方案是 24、48、72。我尽力解释这些事情应该如何工作,因为这是我第一次在 *** 中提问,而且我的英语水平不好,而且我真的需要帮助。
【问题讨论】:
请告诉我们您已经尝试过什么以及取得了什么成果。 【参考方案1】:您的问题是您正在使用q.empty()
来终止循环。其中一些Queues
将在开始时为空,而Process
将过早终止。您需要一种不同的技术来让p2
和p3
进程知道何时退出。
这是对您的代码的修改,它使用None
作为队列中的标志以在完成时发出信号:
from multiprocessing import Process, current_process, cpu_count, Queue, Pool, Lock, Array
from threading import Thread, current_thread
import time
import os
def a(pid, q1, q2, lock):
while not q1.empty():
data = q1.get()
print("data from q1 is %s" % data)
# for i in range(1000000):
new_data = data*2
lock.acquire()
q2.put(new_data)
print(pid)
lock.release()
q2.put(None)
def b(pid, q2, q3, lock):
while True:
data = q2.get()
if data is None:
q3.put(None)
return
print("data from q2 is %s" % data)
# for i in range(1000000):
lock.acquire()
new_data = data*3
q3.put(new_data)
print(pid)
lock.release()
def c(pid, q3, q4, lock):
while True:
data = q3.get()
if data is None:
return
print("data from q3 is %s" % data)
# for i in range(1000000):
lock.acquire()
new_data = data*4
q4.put(new_data)
print(pid)
lock.release()
if __name__ == "__main__":
number = [1,2,3]
lock = Lock()
q1 = Queue()
q2 = Queue()
q3 = Queue()
q4 = Queue()
for data in number:
q1.put(data)
p1 = Process(target=a,args=(1, q1, q2, lock))
p2 = Process(target=b,args=(2, q2, q3, lock))
p3 = Process(target=c,args=(3, q3, q4, lock))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
for i in range(q4.qsize()):
print(q4.get())
另外,您实际上并不需要Lock
。根据documentation:
队列模块实现了多生产者、多消费者队列。它 在线程编程中特别有用,当信息必须是 在多个线程之间安全地交换。这个队列类 模块实现了所有需要的锁定语义。
【讨论】:
以上是关于如何结合python多处理和管道技术?的主要内容,如果未能解决你的问题,请参考以下文章
ElasticSearch实战(三十七)-Ingest Pipeline + Painless Script (多管道脚本处理器)
ElasticSearch实战(三十七)-Ingest Pipeline + Painless Script (多管道脚本处理器)