multiprocessing模块

Posted lalavender

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing模块相关的知识,希望对你有一定的参考价值。

multiprocessing模块为在子进程中运行任务、通信和共享数据,以及执行各种形式的同步提供支持。进程没有任何共享状态,如果某个进程修改数据,改动只限于该进程内。

Process()类:表示运行在一个子进程中的任务。

class Process(object):
def init(self, group=None, target=None, name=None, args=(), kwargs={}):

参数:

group:未使用

target:当前进程启动时执行的可调用对象

name:为进程指定描述性名称的字符串

args:传递给target的位置参数的元组

kwargs:传递给target的关键字参数的字典

实例p有以下方法和属性:


'authkey','daemon', 'exitcode', 'ident', 'is_alive', 'join', 'name', 'pid', 'run', 'sentinel', 'start', 'terminate'

方法

1.p.is_alive():如果p仍然运行,返回True

2.p.join():等待进程p终止。timeout是可选的超市期限。进程可以被连接无数次,但如果连接自身则会出错。

3.p.run():进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一个方法是继承Process类并重新实现run()函数。

4.p.start():启动进程。这将运行代表进程的子进程,并调用该子进程中的p.run()函数

5.p.terminate():强制终止进程。如果调用此函数,进程P将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将变为僵尸进程。使用此方法时需特别小心。如果p保存了一个锁或参与了进程间通信,那么终止它可能会导致死锁或I/O损坏。

属性

6.p.authkey:进程的身份验证键。除非显式设定,这是由os. urandom()函数生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性。这类连接只有在两端具有相同的身份验证键时才能成功。

7.p.daemon:

一个布尔标志,指示进程是否是后台进程。当创建它的 Python进程终止时,后台( Daemonic)进程将自动终止。另外,禁止后台进程创建自己的新进程。p.daemon的值必须在使用 p.start()函数启动进程之前进行设置。

8.p.exitcode:进程的整数退出代码。如果进程仍然在运行,它的值为None。如果值为负数,-N表示进程由信号N所终止。

9.p.name:进程的名称。

10.p.pid:进程的整数进程ID。

11.p.ident:

12.p.sentinel:

  • 例子:demo3.py

进程间通信

  • multiprocessing模块支持进程间通信的两种主要形式:管道和队列。这两种方法都是使用消息传递实现的,但队列接口有意模仿线程程序中常见的队列用法。

队列

Queue( [maxsize])


class Queue(object):

def __init__(self, maxsize=-1):

self._maxsize = maxsize
  • 创建共享的进程队列。 maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底

层队列使用管道和锁实现。另外,还需要运行攴持线程以便将队列中的数据传输到底层管道中

  • Queue的实例q具有以下方法:

'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join_thread', 'put', 'put_nowait', 'qsize'
  1. q.cancel_join_thread():

  2. q.close():

  3. q.empty():

  4. q.full():

  5. q.get():

  6. q.get_nowait():

  7. q.join_thread():

  8. q.put():

  9. q.put_nowait():

  10. q.qsize():

JoinableQueue([maxsize])

  • 创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项的消费者通知生产者项已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

  • JoinableQueue的实例p除了与 Queue对象相同的方法之外,还具有以下方法:


'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join', 'join_thread', 'put', 'put_nowait', 'qsize', 'task_done'
  1. p.task_done():消费者使用此方法发出信号,表示qget()返回的项已经被处理。如果调用此方法的次数大于从队列中删除的项的数量,将引发 Valueerror异常。

  2. p.join():生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到为队列中的每个项均调用q.task done()方法为止。

  • 例子:demo4.py

管道

Pip([duplex])

在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn和conn2是表示管道两端的 Connection对象。默认情况下,管道是双向的。如果将 duplex置为 False,conn只能用于接收,而con2只能用于发送。必须在创建和启动使用管道的 Process对象之前调用pipe()方法。


def Pipe(duplex=True):

return Connection(), Connection()

Pipe()方法返回的 Connection对象的实例c具有以下方法和属性。

  1. c.close():关闭连接。如果c被垃圾回收,将自动调用此方法。

  2. c.fileno():返回连接使用的整数文件描述符。

  3. c.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将 timeout置为None,操作将无限期地等待数据到达。

  4. c.recv():接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFERror异常。

  5. c.recv_bytes([maxlength]):

接受c.send_ bytes()方法发送的一条完整的字节消息。 maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值,将引发 IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引EOFERror异常。

  1. c.recv_bytes_into(buffer [ offset]):接收一条完整的字节消息,并把它保存在 buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发 BufferTooShort异常。

  2. c.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

  3. C.send_bytes(buffer [,offset [,size]]):通过连接发送字节数据缓冲区。 buffer是支持缓冲区接口的任意对象, offset是缓冲区中的字

节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。

如何使用管道实现前面的生产者-消费者问题


# import multiprocessing

# def consumer(output_p,input_p):

# input_p.close()

# while True:

# try:

# item = output_p.recv()

# except EOFError:

# break

# print(item)

# print("Consumer done")

#

#

# def producer(sequence,input_P):

# for item in sequence:

# input_P.send(item)

#

#

# if __name__ == '__main__':

# (output_p,input_p) = multiprocessing.Pipe()

# cons_p = multiprocessing.Process(target=consumer,args=(output_p,input_p),)

# cons_p.start()

#

#

# output_p.close()

#

# sequence = [1,2,3,4]

# producer(sequence,input_p)

# input_p.close()

#

# cons_p.join()

管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:


import multiprocessing

def add(pipe):

server_P, client_P = pipe

client_P.close()

while True:

try:

x,y = server_P.recv()

except EOFError:

break

result = x + y

server_P.send(result)

print("Server done")

if __name__ == '__main__':

(server_P, client_P) = multiprocessing.Pipe()

adder_p = multiprocessing.Process(target=add,args=((server_P, client_P),))

adder_p.start()

server_P.close()

client_P.send((3,4))

print(client_P.recv())

client_P.send(('hello','world'))

print(client_P.recv())

client_P.close()

adder_p.join()
  • 如何使用管道实现前面的生产者-消费者问题这个例子中有一个错误:

output_p, input_p= pipe

传进来的只有一个参数却赋值给两个参数。

以上是关于multiprocessing模块的主要内容,如果未能解决你的问题,请参考以下文章

multiprocessing模块

Python多进程 - subprocess & multiprocess

multiprocessing模块

有没有办法在嵌套函数或模块中使用 multiprocessing.pool ?

进程的创建-multiprocessing

multiprocessing模块创建进程