multiprocessing模块
Posted lalavender
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing模块相关的知识,希望对你有一定的参考价值。
multiprocessing模块为在子进程中运行任务、通信和共享数据,以及执行各种形式的同步提供支持。进程没有任何共享状态,如果某个进程修改数据,改动只限于该进程内。
Process()类:表示运行在一个子进程中的任务。
class Process(object):
def init(self, group=None, target=None, name=None, args=(), kwargs={}):
参数:
group:未使用
target:当前进程启动时执行的可调用对象
name:为进程指定描述性名称的字符串
args:传递给target的位置参数的元组
kwargs:传递给target的关键字参数的字典
实例p有以下方法和属性:
'authkey','daemon', 'exitcode', 'ident', 'is_alive', 'join', 'name', 'pid', 'run', 'sentinel', 'start', 'terminate'
方法
1.p.is_alive():如果p仍然运行,返回True
2.p.join():等待进程p终止。timeout是可选的超市期限。进程可以被连接无数次,但如果连接自身则会出错。
3.p.run():进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一个方法是继承Process类并重新实现run()函数。
4.p.start():启动进程。这将运行代表进程的子进程,并调用该子进程中的p.run()函数
5.p.terminate():强制终止进程。如果调用此函数,进程P将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将变为僵尸进程。使用此方法时需特别小心。如果p保存了一个锁或参与了进程间通信,那么终止它可能会导致死锁或I/O损坏。
属性
6.p.authkey:进程的身份验证键。除非显式设定,这是由os. urandom()函数生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性。这类连接只有在两端具有相同的身份验证键时才能成功。
7.p.daemon:
一个布尔标志,指示进程是否是后台进程。当创建它的 Python进程终止时,后台( Daemonic)进程将自动终止。另外,禁止后台进程创建自己的新进程。p.daemon的值必须在使用 p.start()函数启动进程之前进行设置。
8.p.exitcode:进程的整数退出代码。如果进程仍然在运行,它的值为None。如果值为负数,-N表示进程由信号N所终止。
9.p.name:进程的名称。
10.p.pid:进程的整数进程ID。
11.p.ident:
12.p.sentinel:
- 例子:demo3.py
进程间通信
- multiprocessing模块支持进程间通信的两种主要形式:管道和队列。这两种方法都是使用消息传递实现的,但队列接口有意模仿线程程序中常见的队列用法。
队列
Queue( [maxsize])
class Queue(object):
def __init__(self, maxsize=-1):
self._maxsize = maxsize
- 创建共享的进程队列。 maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底
层队列使用管道和锁实现。另外,还需要运行攴持线程以便将队列中的数据传输到底层管道中。
- Queue的实例q具有以下方法:
'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join_thread', 'put', 'put_nowait', 'qsize'
q.cancel_join_thread():
q.close():
q.empty():
q.full():
q.get():
q.get_nowait():
q.join_thread():
q.put():
q.put_nowait():
q.qsize():
JoinableQueue([maxsize])
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项的消费者通知生产者项已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例p除了与 Queue对象相同的方法之外,还具有以下方法:
'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join', 'join_thread', 'put', 'put_nowait', 'qsize', 'task_done'
p.task_done():消费者使用此方法发出信号,表示qget()返回的项已经被处理。如果调用此方法的次数大于从队列中删除的项的数量,将引发 Valueerror异常。
p.join():生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到为队列中的每个项均调用q.task done()方法为止。
- 例子:demo4.py
管道
Pip([duplex])
在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn和conn2是表示管道两端的 Connection对象。默认情况下,管道是双向的。如果将 duplex置为 False,conn只能用于接收,而con2只能用于发送。必须在创建和启动使用管道的 Process对象之前调用pipe()方法。
def Pipe(duplex=True):
return Connection(), Connection()
Pipe()方法返回的 Connection对象的实例c具有以下方法和属性。
c.close():关闭连接。如果c被垃圾回收,将自动调用此方法。
c.fileno():返回连接使用的整数文件描述符。
c.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将 timeout置为None,操作将无限期地等待数据到达。
c.recv():接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFERror异常。
c.recv_bytes([maxlength]):
接受c.send_ bytes()
方法发送的一条完整的字节消息。 maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值,将引发 IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引EOFERror异常。
c.recv_bytes_into(buffer [ offset]):接收一条完整的字节消息,并把它保存在 buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发 BufferTooShort异常。
c.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
C.send_bytes(buffer [,offset [,size]]):通过连接发送字节数据缓冲区。 buffer是支持缓冲区接口的任意对象, offset是缓冲区中的字
节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。
如何使用管道实现前面的生产者-消费者问题
# import multiprocessing
# def consumer(output_p,input_p):
# input_p.close()
# while True:
# try:
# item = output_p.recv()
# except EOFError:
# break
# print(item)
# print("Consumer done")
#
#
# def producer(sequence,input_P):
# for item in sequence:
# input_P.send(item)
#
#
# if __name__ == '__main__':
# (output_p,input_p) = multiprocessing.Pipe()
# cons_p = multiprocessing.Process(target=consumer,args=(output_p,input_p),)
# cons_p.start()
#
#
# output_p.close()
#
# sequence = [1,2,3,4]
# producer(sequence,input_p)
# input_p.close()
#
# cons_p.join()
管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:
import multiprocessing
def add(pipe):
server_P, client_P = pipe
client_P.close()
while True:
try:
x,y = server_P.recv()
except EOFError:
break
result = x + y
server_P.send(result)
print("Server done")
if __name__ == '__main__':
(server_P, client_P) = multiprocessing.Pipe()
adder_p = multiprocessing.Process(target=add,args=((server_P, client_P),))
adder_p.start()
server_P.close()
client_P.send((3,4))
print(client_P.recv())
client_P.send(('hello','world'))
print(client_P.recv())
client_P.close()
adder_p.join()
- 在
如何使用管道实现前面的生产者-消费者问题
这个例子中有一个错误:
output_p, input_p= pipe
传进来的只有一个参数却赋值给两个参数。
以上是关于multiprocessing模块的主要内容,如果未能解决你的问题,请参考以下文章
Python多进程 - subprocess & multiprocess