引子
进程
线程(优先阅读)
协程
进程
概念:就是一个程序在一个数据集上的一次动态执行过程(本质上来讲,就是运行中的程序(代指运行过程),程序不运行就不是进程) 抽象概念
组成:
1、程序:我们编写的程序用来描述进程要完成哪些功能以及如何完成
2、数据集:数据集则是程序在执行过程中所需要使用的资源
3、进程控制块:进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
阐释:进程与进程之间都占用的是独立的内存块,它们彼此之间的数据也是独立的
优点:同时利用多个CPU,能够同时进行多个操作
缺点:耗费资源(需要重新开辟内存空间)
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字。
pid:进程号。
创建进程的方式有俩种
一,通过调用模块的方式来创建线程
# 进程模块 import multiprocessing import time def f1(): start = time.time() sum = 0 for n in range(100000000): sum += n print(sum) print("data:{}".format(time.time() - start)) if __name__ == \'__main__\': # windows在调用进程的时候,必须加这句话,否则会报错 li = [] p1 = multiprocessing.Process(target=f1) li.append(p1) p2 = multiprocessing.Process(target=f1) li.append(p2) for p in li: p.start() for i in li: i.join() print("ending...")
二,通过继承类的方式(推荐)
import multiprocessing class Process(multiprocessing.Process): def run(self): sum = 0 for n in range(100000000): sum += n print(sum) li = [] for i in range(2): p = Process() li.append(p) if __name__ == \'__main__\': for p in li: p.start() for i in li: i.join() print("ending")
进程之间的通信
创建进程模块的下队列(Queue)
# 进程之间的通信 Queue from multiprocessing import Queue, Process, Pipe import os,time,random def write(q): print("process to write{}".format(os.getpid())) for value in ["A","B","C"]: print("Put {} to queue...".format(value)) q.put(value) time.sleep(random.random()) def read(q): print("process to read{}".format(os.getpid())) while True: value = q.get(True) print("Get {} from queue".format(value)) if __name__ == \'__main__\': q = Queue() pw = Process(target=write,args=(q,)) # 这里传输的q是copy的 pr = Process(target=read,args=(q,)) pw.start() pr.start() pw.join() pr.terminate() # 强行终止进程(因为这个子进程定义了一个死循环)
管道(Pipe)
# 进程之间的通信 Pipe(类似于socket) from multiprocessing import Queue, Process, Pipe import os,time,random # 说明Pipe的send是没有返回值的 pipe = Pipe() # print(pipe) def worker(pipe): time.sleep(random.random()) for i in range(10): print("worker send {}".format(pipe.send(i))) def Boss(pipe): while True: print("Boss recv {}".format(pipe.recv())) p1 = Process(target=worker,args=(pipe[0],)) p2 = Process(target=Boss,args=(pipe[1],)) if __name__ == \'__main__\': p1.start() p2.start()
上述实现了进程间的数据通信,那么进程可以达到数据共享么?Sure。
前一节中, Pipe、Queue 都有一定数据共享的功能,但是他们会堵塞进程, 这里介绍的两种数据共享方式都不会堵塞进程, 而且都是多进程安全的。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
.
由上述英文我们了解到,通过Manager()可以实现进程上的数据共享,并且支持的类型也由很多,接下来看代码
from multiprocessing import Process, Manager def f(d,l,n): d["name"] = "alex" d[n] = "1" l.append(n) if __name__ == \'__main__\': with Manager() as manager: # 类似于文件操作的with open(...) d = manager.dict() l = manager.list(range(5)) print(d,l) p_list = [] for n in range(10): p = Process(target=f,args=(d, l, n)) p.start() p_list.append(p) for p in p_list: p.join() # 这儿的join必须加 print(d) print(l) # 关于数据共享的进程等待的问题,鄙人作出一些自己的理解 # 多核CPU的情况下,进程间是可以实现并行的,当然每个核处理的速度又有极其细微的差异性,速度处理稍慢些的进程在还在对数据进行处理的候,同时又想要得到数据了,自然会出现错误,所以要等待进程处理完这份数据的时候再进行操作
from multiprocessing import Process, Manager def func(n,a): n.value = 50 for i in range(len(a)): a[i] += 10 if __name__ == \'__main__\': with Manager() as manager: num = manager.Value("d", 0.0) ints = manager.Array("i", range(10)) p = Process(target=func,args=(num,ints)) p.start() p.join() print(num) print(ints) 输出 Value(\'d\', 50) array(\'i\', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]) # 共享内存有两个结构,一个是 Value, 一个是 Array,这两个结构内部都实现了锁机制,因此是多进程安全的。 # Value 和 Array 都需要设置其中存放值的类型,d 是 double 类型,i 是 int 类型,具体的对应关系在Python 标准库的 sharedctypes 模块中查看。 # 上面的共享内存支持两种结构 Value 和 Array, 这些值在主进程中管理,很分散。 Python 中还有一统天下,无所不能的Manager,专门用来做数据共享。 其支持的类型非常多。
进程同步
Lock
锁是为了确保数据一致性,比如读写锁,每个进程给一个变量增加 1 ,但是如果在一个进程读取但还没有写入的时候,另外的进程也同时读取了,并写入该值,则最后写入的值是错误的,这时候就需要锁。
# 为什么引申进程同步 # 数据的一致性 import time from multiprocessing import Lock, Process def run(i, lock): with lock: # 自动获得锁和释放锁 time.sleep(1) print(i) if __name__ == \'__main__\': lock = Lock() for i in range(10): p = Process(target=run,args=(i,lock,)) p.start()
Lock 同时也实现了 ContextManager API, 可以结合 with 语句使用, 关于 ContextManager, 请移步 Python 学习实践笔记 装饰器 与 context 查看。
Semaphore
Semaphore 和 Lock 稍有不同,Semaphore 相当于 N 把锁,获取其中一把就可以执行了。 信号量的总数 N 在构造时传入,s = Semaphore(N)
。 和 Lock 一样,如果信号量为0,则进程堵塞,直到信号大于0。
进程池
如果有50个任务要去执行,CPU只有4核,那创建50个进程完成,其实大可不必,徒增管理开销。如果只想创建4个进程,让它们轮流替完成任务,不用自己去管理具体的进程的创建销毁,那 Pool 是非常有用的。
Pool 是进程池,进程池能够管理一定的进程,当有空闲进程时,则利用空闲进程完成任务,直到所有任务完成为止
1
2
3
4
5
6
7
8
|
def func(x): return x * x if __name__ = = \'__main__\' : |