python 进程

Posted nonzero

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 进程相关的知识,希望对你有一定的参考价值。

导航:

1、创建进程的两种方式
2、Process的方法
3、进程间的通讯1,进程队列Queue--先进先出
4、进程间的通讯2,管道通讯 Pipe
5、进程间的数据共享,Manager
6、多进程同步问题
7、进程池Pool

 

python中多进程可以解决cpython解释器多线程中GIL存在的问题,可以利用CPU的多核资源,实现真的并发效果。操作系统中每个线程有自己的内存空间,数据并不共享。

python中使用multiprocessing包提供的接口给我们创建多进程,multiprocessing与threading的使用方法相似。

 

1、创建进程的两种方式

1)通过multiprocessing.Process创建

  Process(group=None, target=None, name=None, args=(), kwargs={})

  • group    线程组,没什么用,默认为空就好
  • target    要执行的方法
  • name    进程的名字
  • args/kwargs 执行target方法要传入的参数
 1 def fn(word):
 2     # ------子进程的逻辑-----
 3     print("父进程", os.getppid())
 4     print("子进程", os.getpid())
 5     print("传入的参数:", word)
 6 
 7 
 8 if __name__ == "__main__":
 9     p = Process(target=fn, args=("haha",))  # 创建子进程
10     p.start()  # 开启子进程
11     print("主进程", os.getpid())
12 
13 
14 # 输出结果
15 主进程 5916
16 父进程 5916
17 子进程 2936
18 传入的参数: haha
通过Process创建一个子进程

很简单这样就可以创建一个子进程了,可以看第3 4 11行打印的进程id可以知道这是不同的进程。

这里需要说明的是:在所有的进程当中子进程都是由父进程创建出来的,在这个例子中,子进程的的父进程就是主进程,可以看到第3 11行打印的进程id。

在windows下,创建进程一定要放在__main__中,不然会报错

2)通过继承Process类创建子进程

 1 from multiprocessing import Process
 2 import os
 3 
 4 class MyProcess(Process):
 5     def __init__(self, name):
 6         super().__init__()  # 如果重写了初始化方法,在初始化方法中一定要调用父类的初始化方法
 7         self.name = name
 8 
 9     def run(self):
10         # ------子进程的逻辑-----
11         print("父进程", os.getppid())
12         print("子进程", os.getpid())
13         print("子进程的名字", self.name)
14 
15 
16 if __name__ == "__main__":
17     p = MyProcess("haha")  # 创建子进程
18     p.start()  # 开启子进程
19     print("主进程", os.getpid())
20 
21 
22 # 输出结果
23 主进程 740
24 父进程 740
25 子进程 7052
26 子进程的名字 haha
通过继承Process创建子进程

通过继承Process创建子进程需要重写 run 方法,这是子进程逻辑的入口,当开启子进程时会自动调用这个方法

上边这两种方式中,主进程执行完所有的逻辑后会等待子进程结束在一起结束,与fork函数创建的方式不一样

 

2、Process的方法

1)is_alive()

is_alive方法判断指定对象进程的存活状态。进程 start 后一直到该进程结束都返回True。进程 start 前 或进程已经结束返回False

 1 from multiprocessing import Process
 2 import time
 3 
 4 def fn():
 5     # ------子进程的逻辑-----
 6     time.sleep(1)  # 模拟子进程执行需要消耗的时间
 7 
 8 
 9 if __name__ == "__main__":
10     p = Process(target=fn)  # 创建子进程
11     print("p.start前-->", p.is_alive())
12     p.start()  # 开启子进程
13     print("p.start后-->", p.is_alive())
14     time.sleep(2)  # 模拟主进程执行需要消耗的时间,为了确保子进程先结束
15     print("子进程结束后-->", p.is_alive())
16 
17 
18 # 输出结果
19 p.start前--> False
20 p.start后--> True
21 子进程结束后--> False
is_alive方法

2)join()

join(timeout=None)方法,堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行。可设置timeout值,最多堵塞timeout时间(秒)。注意:join方法只能在start()后才可以使用

 1 from multiprocessing import Process
 2 import time
 3 
 4 def fn():
 5     # ------子进程的逻辑-----
 6     time.sleep(3)  # 模拟子进程执行需要消耗的时间
 7     print("子进程中-->", time.ctime())
 8 
 9 
10 if __name__ == "__main__":
11     p = Process(target=fn)  # 创建子进程
12     p.start()  # 开启子进程
13     print("p.join前-->", time.ctime())
14     p.join()  # 堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行
15     print("p.join后-->", time.ctime())
16 
17 
18 # 输出内容
19 p.join前--> Sat Sep 29 16:03:40 2018
20 子进程中--> Sat Sep 29 16:03:43 2018
21 p.join后--> Sat Sep 29 16:03:43 2018
join方法

这里可以看到第15行语句一直等到第7行执行完才输出

 1 from multiprocessing import Process
 2 import time
 3 
 4 def fn():
 5     # ------子进程的逻辑-----
 6     time.sleep(2)  # 模拟子进程执行需要消耗的时间
 7     print("子进程中-->", time.ctime())
 8 
 9 
10 if __name__ == "__main__":
11     p = Process(target=fn)  # 创建子进程
12     p.start()  # 开启子进程
13     print("p.join前-->", time.ctime())
14     p.join(1)  # 设置超时1秒
15     print("p.join后-->", time.ctime())
16 
17 
18 # 输出内容
19 p.join前--> Sat Sep 29 16:04:02 2018
20 p.join后--> Sat Sep 29 16:04:03 2018
21 子进程中--> Sat Sep 29 16:04:04 2018
join设置timeout值

这里可以看到第15行语句只堵塞了1秒的时间

3)start(),进程准备就绪,等待cpu的执行(调度)

4)run(),继承Process类的子类,需要重写的方法,当进程对象调用 start 方法时自动执行 run 方法,也是进程的入口

5)terminate(),不管进程是否执行完,直接终止进程

6)daemon属性True/False

与线程的setDeamon()一样。将该进程对象设置为守护进程,效果:父进程将不再等待子进程,父进程结束时,子进程一起结束。注意:daemon属性只能在 start() 前设置

 1 from multiprocessing import Process
 2 import time
 3 
 4 def fn():
 5     # ------子进程的逻辑-----
 6     time.sleep(2)  # 模拟子进程执行需要消耗的时间
 7     print("----------")
 8 
 9 
10 if __name__ == "__main__":
11     p = Process(target=fn)  # 创建子进程
12     p.daemon = True
13     p.start()  # 开启子进程
14     print("++++++++++")
15 
16 
17 # 输出内容
18 ++++++++++
daemon属性

子进程中的第7行语句并没有执行,即:子进程在父进程结束时也跟着结束了

 

3、进程间的通讯1,进程队列Queue--先进先出

Queue(maxsize=-1),maxsize=-1默认队列长度没有最大值,maxsize=5表示队列长度最大值为5

1)put(obj, block=True, timeout=None)

  • obj  添加进队列的值,可以添加任意类型的值
  • block 默认为True,当队列满时,继续添加则发生堵塞,直到队列get()值出去;block=False,队列满时继续添加不堵塞,但会抛出queue.Full异常
  • timeout  堵塞超时,当队列满时,继续添加发生堵塞,堵塞超时timeout秒,超时则会抛出queue.Full异常
 1 >>> from multiprocessing import Process, Queue
 2 >>> q = Queue(3)  # 创建进程队列,队列最大长度为3
 3 >>> q.put("haha")  # 往队列添加字符串
 4 >>> q.put([])  # 往队列添加列表
 5 >>> q.put({})  # 往队列添加字典
 6 >>> q.put(5)  # 往队列添加数字
 7 _  # 满时继续添加,发生堵塞(光标会一直卡在这)
 8 
 9 >>> q.put(5,block=False)  # 往队列添加数字,不堵塞添加
10 Traceback (most recent call last):
11   File "<stdin>", line 1, in <module>
12   File "C:\\D_Program\\Python\\Python37\\lib\\multiprocessing\\queues.py", line 83, in put
13     raise Full
14 queue.Full
15 
16 >>> q.put(5, timeout=3)  # 往队列添加数字,并设置超时3秒
17 Traceback (most recent call last):
18   File "<stdin>", line 1, in <module>
19   File "C:\\D_Program\\Python\\Python37\\lib\\multiprocessing\\queues.py", line 83, in put
20     raise Full
21 queue.Full
put方法简单使用,这里还不涉及进程间的通讯

2)get(block=True, timeout=None)  使用方法与put一样的用法,返回先添加的值

 1 >>> q.get()
 2 \'haha\'
 3 >>> q.get()
 4 []
 5 >>> q.get()
 6 {}
 7 >>> q.get()
 8 _  # 为空时继续取值,发生堵塞(光标会一直卡在这)
 9 
10 >>> q.get(timeout=3)
11 Traceback (most recent call last):
12   File "<stdin>", line 1, in <module>
13   File "C:\\D_Program\\Python\\Python37\\lib\\multiprocessing\\queues.py", line 105, in get
14     raise Empty
15 _queue.Empty
get方法简单使用,这里还不涉及进程间的通讯

3)qsize()  返回当前队列的长度

 1 >>> from multiprocessing import Queue
 2 >>> q = Queue(3)
 3 >>> q.put("haha")
 4 >>> q.qsize()
 5 1
 6 >>> q.put(100)
 7 >>> q.qsize()
 8 2
 9 >>> q.get()
10 \'haha\'
11 >>> q.qsize()
12 1
qsize返回当前队列长度,这里还不涉及进程间的通讯

4)put_nowait(obj)  相当于put(obj, block=False)

5)get_nowait()  相当于get(block=False)

6)empty()   判断队列是否为空,True:空,False:不为空

7)full()   判断队列是否已满,True:已满,False:未满

8)close()  关闭队列,关闭后,将不能添加或取出值

9)通过队列实现多个进程间的通讯

 1 from multiprocessing import Process, Queue
 2 import time
 3 
 4 def write_queue(q):
 5     for i in range(5):
 6         q.put(i)  # 往队列添加值
 7         print("put %d" % i)
 8 
 9 
10 def read_queue(q):
11     time.sleep(1)  # 确保write_queue中队列已经有值
12     while not q.empty():
13         s = q.get()  # 取出队列的值
14         print("get %s" % s)
15 
16 
17 if __name__ == "__main__":
18     q = Queue(5)
19     p1 = Process(target=write_queue, args=(q,))  # 创建子进程
20     p2 = Process(target=read_queue, args=(q,))  # 创建子进程
21     p1.start()  # 开启子进程
22     p2.start()  # 开启子进程
23 
24 
25 # 输出结果
26 put 0
27 put 1
28 put 2
29 put 3
30 put 4
31 get 0
32 get 1
33 get 2
34 get 3
35 get 4
通过队列的多进程间通信,target
 1 from multiprocessing import Process, Queue
 2 import time
 3 
 4 class WriteProcess(Process):
 5     def __init__(self, q):
 6         super().__init__()
 7         self.q = q
 8 
 9     def run(self):
10         print("子进程WriteProcess-->")
11         for i in range(5):
12             self.q.put(i)
13             print("put %d" % i)
14 
15 
16 class ReadProcess(Process):
17     def __init__(self, q):
18         super().__init__()
19         self.q = q
20 
21     def run(self):
22         time.sleep(1)  # 确保write_queue中队列已经有值
23         print("子进程ReadProcess-->")
24         while not self.q.empty():
25             s = self.q.get()
26             print("get %s" % s)
27 
28 
29 if __name__ == "__main__":
30     q = Queue(5)
31     p1 = WriteProcess(q)  # 创建子进程
32     p2 = ReadProcess(q)  # 创建子进程
33     p1.start()  # 开启子进程
34     p2.start()  # 开启子进程
35 
36 
37 # 输出结果
38 子进程WriteProcess-->
39 put 0
40 put 1
41 put 2
42 put 3
43 put 4
44 子进程ReadProcess-->
45 get 0
46 get 1
47 get 2
48 get 3
49 get 4
通过队列的多进程间通讯--run方法

 

4、进程间的通讯2,管道通讯  Pipe

1)Pipe(duplex=True)

Pipe是一个函数,返回元组(Connection(), Connection()).   即返回管道的两端。默认duplex=True为全双工模式,duplex=Fasle中第一个Connection只能接收信息,第二个Connection只能发送消息

2)Connection常用方法

  • send(obj)     将对象obj发送到管道另一端,发送的数据必须是可序列化的对象。
  • recv()           从管道的另一端接收send()发送的数据。没有数据可接收,将发生堵塞。
  • send_bytes(buffer, offset=-1, size=-1)    发送字节缓冲区,buffer是支持支持字节缓冲的任意对象,offset为buffer的字节偏移量(可以当初下标),size为要发送的字节数。
  • recv_bytes(maxlength=-1)    接收send_bytes()发送的一次数据,maxlength指定接收长度,超出这个长度则抛出将引发IOError异常,没有数据可接收,将发生堵塞。
  • poll([timeout])      返回 True/False,判断管道内是否有数据可以接收,True:数据可接收。timeout为堵塞的时间秒,timeout=None时一直堵塞,直到有数据可以接收
  • close()        关闭链接,关闭链接后将不能继续使用管道,当不再使用管道时可将其关闭
 1 from multiprocessing import Process, Pipe
 2 import time
 3 
 4 class MyProcess1(Process):
 5     def __init__(self, con):
 6         super().__init__()
 7         self.con = con
 8 
 9     def run(self):
10         self.con.send("12345")
11         print("MyProcess1--send-->", "12345")
12         msg = self.con.recv_bytes().decode(encoding="utf-8")
13         print("MyProcess1--recv_bytes-->", msg)
14 
15 
16 class MyProcess2(Process):
17     def __init__(self, con):
18         super().__init__()
19         self.con = con
20 
21     def run(self):
22         msg = self.con.recv()
23         print("MyProcess2--recv-->", msg)
24         self.con.send_bytes("哈哈".encode("UTF-8"))
25         print("MyProcess2--send_bytes-->", "哈哈")
26 
27 
28 if __name__ == "__main__":
29     con1, con2 = Pipe()
30     p1 = MyProcess1(con1)  # 创建子进程
31     p2 = MyProcess2(con2)  # 创建子进程
32     p1.start()  # 开启子进程
33     p2.start()  # 开启子进程
34 
35 
36 # 输出结果
37 MyProcess1--send--> 12345
38 MyProcess2--recv--> 12345
39 MyProcess2--send_bytes--> 哈哈
40 MyProcess1--recv_bytes--> 哈哈
多进程间的管道通讯

 

5、进程间的数据共享,Manager

Manager提供多进程间的数据共享,Manager内的主要方法有  dict(mapping_or_sequence), list(sequence), Value(typecode, value), Array(typecode, sequence)