[python 并行3]进程
Posted 影灵衣
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[python 并行3]进程相关的知识,希望对你有一定的参考价值。
进程篇
基本使用 1
#coding=utf-8
import multiprocessing
import os # 获取pid用
import time # 延时用
# 子进程要执行的函数
def child_proc(name):
print(f'child process {name} pid: {os.getpid()}')
time.sleep(3)
print(f'{name} finish')
# 主进程,必须在主模块中执行
if __name__ == '__main__':
print(f'parent process {os.getpid()} is running')
# 生成子进程
p1 = multiprocessing.Process(target = child_proc, args = ('child-1',))
p2 = multiprocessing.Process(target = child_proc, args = ('child-2',))
p1.start()
p2.start()
print(f'parent process {os.getpid()} is end')
输出
parent process 20114 is running
parent process 20114 is end
child process child-1 pid: 20115
child process child-2 pid: 20116
child-1 finish
child-2 finish
注意! Python官方文档提到为何必须要使用if __name__ = ‘__main__‘
,由于该包的所有功能都需要将主模块导入到子模块中,但是IDLE无法将__main__
模块导入子模块,所以只能在文件中编辑好程序执行
更多:multiprocessing — Process-based parallelism
multiprocessing模块
注:根据查看multiprocessing
模块声明,第一行注释显示# Package analogous to ‘threading.py‘ but using processes
,可以发现进程的操作与线程相似 (甚至多进程的模块直接就是线程模块改过来的)
函数声明: class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, , daemon=None)
(group: 官方预留的参数)
target: 子线程要执行的函数
name: 给子线程命名
args: 传递参数到要执行的函数中 (类型为元组)*
daemon: 将线程设置为后台线程 2
Thread类包含的方法:
- start(): 开始进程,它会安排在单独的控制进程中使该对象的
run()
方法被调用 (invoked) (如果多次调用,会发生错误AssertionError
)- run(): 你可以在子类中重写这个方法,标准的
run()
方法会在构造器传递了target
参数后调用它- join(timeout=None): 阻塞当前进程,直到等待调用了
join()
的进程结束,或到达设置的超时timeout
的参数为止- name: 进程名
- is_alive(): 判断进程是否在运行
- daemon: 是否为后台进程的属性值
- pid: 返回进程的id
- terminate(): 结束进程
(在Unix上,使用SIGTERM
信号量完成;在windows上使用TerminateProcess())
(请注意,不会执行退出处理程序和最后的子句等。请注意,进程的后代进程不会被终止 - 它们将简单地变成孤立的。)
(注:如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能被其他进程无法使用。 类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。)- kill(): 杀掉进程,与terminate()相同,但在Unix中使用
SIGKILL
信号量- close(): 关闭进程对象,释放所有与之相关的资源。如果是还在运行,会raised错误
ValueError
,第一次调用会返回成功,其它调用会raise错误ValueError
- exitcode: 子进程的退出码。如果不是被terminate终止的,将会是
None
;如果被信号量N
终止的,将会返回-N- authkey: 进程的身份钥匙(1字节字符串)。当
multipriocessing
在主进程中被初始化时,会使用os.urandom()
标记一个随机字符串。当一个进程对象被创建时,它将会从父进程继承这个身份钥匙,虽然它可能会被改为其它字节字符串- sentinel: (哨兵)当进程结束时,一个数值的系统对象处理将变为
ready
。如果你想立即要等待几个事件,你能用这个值使用multiprocessing.connection.wait()
,否则调用join()
更简单。
Pool进程池
如果进程太多,超过了CPU核数,会导致进程之间的来回切换,影响性能。可以通过创建进程池,把进程加入到里面,如果池中进程没满,就会创建一个进程来执行请求;如果池中进程达到规定的最大值,那么请求会等待
函数声明: class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes: 指定池中进程数量,如果不指定,则为None
,默认就会使用os.cpu_count()
initializer: 如果不是None
,每个进程在开始时都会调用initializer(*initargs)
maxtasksperchild: 工作进程在退出和替换新的工作进程之前,可以完成的任务数量,以释放资源。默认为None
,表示工作进程和这个池的生存时间一样长
context: 用来指定工作进程的上下文
下面有2种方法添加进程到进程池中,分别是apply_async
和apply
apply_async
apply_async()
用来同步执行进程,允许多个进程同时进入池;是异步非阻塞的
函数声明: apply_async(func[, args[, kwds[, callback[, error_callback]]]])
callback: 如果指定了回调,那么它应该是一个可调用的,它接受一个参数
#coding=utf-8
import multiprocessing
import time # 延时用
# 子进程要执行的函数
def child_proc(index):
print(f'{index} process is running')
time.sleep(3)
print(f'{index} process is end')
# 主进程,必须在主模块中执行
if __name__ == '__main__':
print(f'all process start')
# 生成进程池
p = multiprocessing.Pool()
for i in range(5):
p.apply_async(func = child_proc, args = (i,))
p.close()
p.join() # 注! 如果不执行此句,将会直接退出主进程
print(f'all process done!')
输出
all process start
0 process is running
1 process is running
2 process is running
3 process is running
4 process is running
0 process is end
3 process is end
2 process is end
4 process is end
1 process is end
all process done!
apply
apply()
: 只允许一个进程进入池,在一个进程结束后,另一个才能进入;是阻塞的
函数声明: apply(func[, args[, kwds]])
p.apply(func = child_proc, args = (i,)) # 将上面代码中的apply_async换成apply即可
输出
all process start
0 process is running
0 process is end
1 process is running
1 process is end
2 process is running
2 process is end
3 process is running
3 process is end
4 process is running
4 process is end
all process done!
分析:对比apply_async和apply的输出,可以发现,apply_async是同步执行的,而apply是一个一个进入池中执行的
数据共享
参考:Python多进程编程-进程间共享数据
(Pipe
、Queue
都有一定的数据共享功能,但他们会阻塞进程)
Queue
:采用共享队列的内存的方式共享数据
注意:存在queue.Queue
、multiprocessing.Queue
两种队列
queue.Queue:是进程内非阻塞队列,各进程私有
multiprocessing.Queue:是跨进程通信队列,各个子进程共有
共享内存:使用multiprocessing的Value
、Array
类,实现共享内存的方式共享数据
共享进程:使用multiprocessing的Manager
类,实现共享进程的方式共享数据
获取返回值(仅主进程获有数据)
针对进程池实现的方式,可以直接通过获取进程对象的返回值
#coding=utf-8
import multiprocessing
# 子进程要执行的函数
def child_proc(x, y):
return x + y
# 主进程,必须在主模块中执行
if __name__ == '__main__':
# 生成进程池
p = multiprocessing.Pool()
z = p.apply(func = child_proc, args = (1, 2))
print(z)
Queue
使用multiprocessing的Queue
类,实现进程之间的数据共享
#coding=utf-8
from multiprocessing import Process, Queue
# 子进程要执行的函数
def child_proc(queue):
num = queue.get()
num += 10
queue.put(num)
# 主进程,必须在主模块中执行
if __name__ == '__main__':
# 创建共享数据
queue = Queue()
queue.put(1000)
# 创建进程
p1 = Process(target = child_proc, args = (queue,))
p2 = Process(target = child_proc, args = (queue,))
p1.start()
p2.start()
p1.join()
p2.join()
# 打印结果
print(queue.get())
Value、Array
共享内存有2个结构 - Value
、Array
,它们内部都实现了锁机制,因此是多进程安全的
#coding=utf-8
from multiprocessing import Process, Value, Array
# 子进程要执行的函数
def child_proc(num, li):
num.value += 100
for i in range(len(li)):
li[i] += 10
# 主进程,必须在主模块中执行
if __name__ == '__main__':
# 创建共享数据
num = Value('d', 0.0)
li = Array('i', range(10))
# 创建进程
p1 = Process(target = child_proc, args = (num, li))
p2 = Process(target = child_proc, args = (num, li))
p1.start()
p2.start()
p1.join()
p2.join()
# 打印结果
print(num.value)
for x in li:
print(x)
附:Value
、Array
都需要设置其中存放值的类型。d:double;i:int;c:char等
详细:转到multiprocessing.sharedctypes可以查看到各种类型的字符串定义
Manager
(上面的共享内存通过Value和Array结构实现,这些值在主进程中管理,很分散)
Manager通过共享内存来实现共享数据,支持的数据类型很多
详细:multiprocessing.managers
#coding=utf-8
from multiprocessing import Process, Manager
# 子进程要执行的函数
def child_proc(dict1, list1):
dict1['yourname'] += ' snow'
list1[3] += 10
# 主进程,必须在主模块中执行
if __name__ == '__main__':
# 创建共享数据
manager = Manager()
dict1 = manager.dict()
list1 = manager.list(range(4))
dict1['yourname'] = 'youmux'
list1[3] = 0
# 创建进程
p1 = Process(target = child_proc, args = (dict1, list1))
p2 = Process(target = child_proc, args = (dict1, list1))
p1.start()
p2.start()
p1.join()
p2.join()
# 打印结果
print(dict1['yourname'])
for x in list1:
print(x)
1.参考书籍: 参考书籍:《Python并行编程手册》
2.后台线程: 后台线程在主线程停止后就直接停止运行。他们资源(如打开的文件,数据库事务等)可能不会被正确的释放。如果你想要你的线程优美的停止,让他们不要变为后台和使用一个合适的信号机制如事件Event
以上是关于[python 并行3]进程的主要内容,如果未能解决你的问题,请参考以下文章