Python并发编程之进程2

Posted 那些峥嵘岁月

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python并发编程之进程2相关的知识,希望对你有一定的参考价值。

引言

本篇介绍Python并发编程下的进程,先介绍进程的相关知识,然后对python中multiprocessing模块进行介绍(ProcessPipeQueue以及 Lock)。

进程(process)

在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器

进程拥有自己独立的内存空间,所属线程可以访问进程的空间。

程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例。 例如,我们在PyCharm开发环境中写好一个程序,运行的时候python解释器完成解释并执行该程序。

进程 = 程序段 + 数据段 + PCB

全局解释器锁GIL

? GIL是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。Python的Cpython解释器(普遍使用的解释器)使用GIL,在一个Python解释器进程内可以执行多线程程序,但每次一个线程执行时就会获得全局解释器锁,使得别的线程只能等待,由于GIL几乎释放的同时就会被原线程马上获得,那些等待线程可能刚唤醒,所以经常造成线程不平衡享受CPU资源,此时多线程的效率比单线程还要低下。

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

? 可以说它的初衷是很好的,为了保证线程间的数据安全性;但是随着时代的发展,GIL却成为了python并行计算的最大障碍,但这个时候GIL已经遍布CPython的各个角落,修改它的工作量太大,特别是对这种开源性的语音来说。但幸好GIL只锁了线程,我们可以再新建解释器进程来实现并行,那这就是multiprocessing的工作了。

multiprocessing模块介绍

doc:The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

开启子进程的两种方式

第一种:直接调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
import time
def task(name):
print(f‘{name}is running ‘)
time.sleep(3)
print(f‘{name} is done‘)

if __name__ == ‘__main__‘:
# windows操作系统开启多进程,必须得写在main下面。
p = Process(target=task,args=(‘小黑‘,)) # args 一定是一个元组的形式
p.start()
print(‘===主进程‘)
# start与print几乎是同时发出,但是由于操作系统调用子进程会慢一些。#

# 整个py文件是主进程
# p.start通知操作系统,你给我在内存中开辟一个空间,将p进程放进去,让cpu执行。

第二种 :继承式调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
import time
class Myprocess(Process):
def __init__(self,name):
# 这里的顺序是有要求,不然反过来写会覆盖。
super().__init__() # 必须要执行父类的init
self.name = name
def run(self): # 必须定义这个名字
print(f‘{self.name}is running ‘)
time.sleep(3)
print(f‘{self.name} is done‘)

if __name__ == ‘__main__‘:
p = Myprocess(‘小黑‘)
p.start()
print(‘主进程‘)

获取进程以及父进程的pid

操作系统如何区分进程?每个进程都有一个唯一标识,pid

  1. 在终端查看进程的pid,cmd中输入tasklist

  2. 在终端查看指定的进程pid,cmd中输入 tasklist | findstr pycharm

    技术图片

  3. 通过代码查看pid

    1
    2
    3
    4
    5
    import os
    import time
    print(f‘子进程:{os.getpid()}) # 查看当前进程pid
    print(f‘父进程:{os.getppid()}) # 查看父进程pid
    time.sleep(50000)

进程之间的数据隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import time
x = 1000
def task():
global x
x = 2

if __name__ == ‘__main__‘:
p1 = Process(target=task,)
p1.start() # 不是瞬发的,可能会等一会儿
time.sleep(1)
print(f‘主进程:{x}) # 主进程没有运行task这个函数,所以主进程内的x并未改变。

# 但上面有些瑕疵,是主进程先执行的,我们想让子进程先执行,这样才能验证是否正的改变x,所以用一下time来使主进程慢一步。

结论:进程之间的数据是相互隔离的。

下面测试是否有小数据池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
import time
x = 100 # (-5 - 256)
s = ‘h‘
l1 = [1,2,3]
def task():
print(f‘子进程:{id(x)})
print(f‘子进程:{id(s)})
print(f‘子进程:{id(l1)})

if __name__ == ‘__main__‘:
print(f‘主进程:{id(x)})
print(f‘主进程:{id(s)})
print(f‘主进程:{id(l1)})
p1 = Process(target=task,)
p1.start()

结论:只有数字满足小数据池(-5-256)初始化时子进程与主进程是沿用一个.

join方法

join :阻塞目前父进程,它是通知主进程,等我执行完毕,主进程才能执行。

情景一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process
import time

def task(name,sec):
time.sleep(sec)
print(f‘{name} is running‘)

if __name__ == ‘__main__‘:
p1 = Process(target=task, args=(‘1‘,1))
p2 = Process(target=task, args=(‘2‘,2))
p3 = Process(target=task, args=(‘3‘,3))
start_time = time.time()

p1.start()
p2.start()
p3.start()
p1.join() # p1 1s
p2.join() # p2 2s
p3.join() # p3 3s
print(f‘主进程{time.time()-start_time}秒后执行‘) # 主进程3.194712162017822秒后执行

①当 p1.join 通知主进程等p1结束后,主进程开始执行,这里的主进程是 p2.join以后后面的内容。

②当p1执行结束后,p2.join通知主程序等p2结束后,主程序开始执行,这里面的主程序是 p3.join以及后面的内容。

③当p2执行结束后, p3.join通知主程序等p3结束后,主程序开始执行,这里面的主程序是print.

结论,由于p1,p2,p3是同时发出的通知,它们三个在同时处理,所以,最下面的print要等待这三个程序结束,也就是等待执行时间最长的进程结束后,才开始执行,

情景二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process
import time

def task(name,sec):
time.sleep(sec)
print(f‘{name} is running‘)

if __name__ == ‘__main__‘:
p1 = Process(target=task, args=(‘1‘,1))
p2 = Process(target=task, args=(‘2‘,2))
p3 = Process(target=task, args=(‘3‘,3))
start_time = time.time()

p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
print(f‘主进程{time.time()-start_time}秒后执行‘) # 主进程6.424545526504517秒后执行

①当 p1.join通知主程序等p1 结束后,主程序开始执行,这里的主程序是 p2.start及以后的内容,所以,由于不是同时发出的通知,是一种串行的效果。

进程对象的其它属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process
import time

def task(name):
print(f‘{name} is running‘)
time.sleep(3)
print(f‘{name} is done‘)

if __name__ == ‘__main__‘:
p = Process(target=task, args=(‘1‘,),name = ‘任务1‘) # 给进程对象设置name属性
p.start() # start 只是一个通知,它会慢一拍。
# print(p.pid) # 获取进程pid号
# print(p.name)
# time.sleep(1)
# p.terminate() # 终止(结束)子进程, 它也不是立即的
# terminate 与 start一样的工作原理:通知操作系统终止或开启一个子进程,内存中终止或开启是会耗费时间的。
# print(p.is_alive()) # 判断子进程是否存活
# is_alive 只是查看内存中p子进程是否运行,比terminat,start快。 这是否矛盾呢?
# 问如何主动杀死子进程, terminate
print(‘主进程‘)

对象.pid()

获取对象pid号。

对象.name

在初始化时,给进程对象设置name属性。

对象.terminate()

终止(结束)子进程

terminate 与 start一样的工作原理:通知操作系统终止或开启一个子进程,内存中终止或开启是会耗费时间的。

对象.is_alive()

判断子进程是否存活.

如何terminate()对象.is_alive()挨着,会打印True,因为对象.is_alive()的速度比terminate()快一些

补充:为什么

僵尸进程与孤儿进程(重要)

僵尸进程:在类UNIX系统中,僵尸进程是指子进程完成执行,父进程没有通过wait系统调用来读取这个子进程的退出状态的话(在操作系统的进程表中仍有一个表项(进程控制块PCB)(在Linux中具体是task_struct结构)),这个子进程就会一直维持僵尸进程状态,称为“僵尸进程”。

回收:子进程需要保留表项以允许其父进程读取子进程的 exit status:一旦退出态通过 wait系统调用读取,僵尸进程条目就从进程表中删除,称之为回收(reaped)

并且僵尸进程是无法通过 kill命令来清除。

僵尸进程的状态为EXIT_ZOMBIE,缩写Z,ps命令也会打印僵尸进程,但无法使用kill杀死。

可以使用命令: ps aux | grep Z 查看

在回收僵尸进程之前,如果父进程退出了,则僵尸进程变为“孤儿进程”,进而被init进程接管、回收。

孤儿进程父进程执行完成或被终止后仍继续运行的一类进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
import time
import os

def task(name):
print(f‘{name} is running‘)
print(f‘子进程开始了{os.getpid()})
time.sleep(10)

if __name__ == ‘__main__‘:
p = Process(target=task, args=(‘1‘,)) # 给进程对象设置name属性
for i in range(10000):
p =Process(target=task, args=(str(i),))
p.start()
print(f‘主进程开始了{os.getpid()})

以上代码在pycharm运行时,会产生大量的僵尸进程,当我们点击停止(红色正方形)时,会出现一个红色骷髅的标志。

技术图片

僵尸进程有害?

? 一种情景:父进程(僵尸进程)无限的开启子进程,递归的开启,子进程越来越多,僵尸进程越来越多,导致资源泄露

为什么需要僵尸进程?

? 之所以保留 task_struct,是因为它里面保存了进程的pid,退出码、以及一些统计信息,父进程很可能会关心这些信息。

如何清除僵尸进程?

第一种方法:结束父进程,使之成为孤儿进程。当然者个是暴力的手段,因为我们一般肯定是希望父进程继续运行的。

第二种方法:通过wait调用来读取子进程退出状态。比如通过 multiprocessing.Process产出的进程可以通过 子进程.join()的方法来wait,也可以在父进程中处理 SIGCHLD信号,在处理程序中调用wait系统调用或者直接设置为 SIG_IGN来清除僵尸进程。

守护进程(重要)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 生产者与消费者模型会讲到。
from multiprocessing import Process
import time
import os

def task(name):
print(f‘{name} is running‘)
print(f‘子进程开始了{os.getpid()})
time.sleep(50)
print(f‘{name} is done‘)


if __name__ == ‘__main__‘:
p = Process(target=task, args=(‘1‘,)) # 给进程对象设置name属性
p.daemon = True # 将p子进程设置成守护进程,守护主进程,只要主进程结束,子进程无论执行与否,都马上结束
# 守护进程设置必须在 start前面!
p.start()
time.sleep(2)
print(f‘主进程开始了{os.getpid()})

通俗:守护:我守护着你,你要是死了,我就与你一起

doc:当一个进程退出的时候,它试图关闭所有守护着它的子进程。

使用地方:生产者与消费者模型。

守护进程是不能开启子进程的,不然当父进程结束的时候,守护进程结束,那么由守护进程开启的进程会成为孤儿进程。

注意:守护进程设置必须在 start前面!

进程同步

背景

在系统中有一些需要相互合作、协同工作的进程,它们之间的相互联系称为进程的同步

进程同步的主要任务

使并发执行的诸进程之间能有效地共享资源相互合作,从而使程序的并发执行具有可再现性。

进程的两种制约关系

  • 间接制约:竞争同一资源而产生的相互排斥的关系 。

    • 解释: 当某一进程访问某一资源时,不允许别的进程同时访问,这种限制称为互斥, 即多个进程在访问某些资源(如临界资源)时,也要有一种执行次序上的协调 ,当一个进程访问完毕,另一个进程才能访问。所以就其本质来讲,互斥仍是一种同步。
  • 直接制约:进程间共同完成一项任务时直接发生相互作用的关系。

临界资源

  • 临界资源:一次仅允许一个进程访问的资源。例如打印机。
  • 临界区:访问临界资源的代码段,不允许多个并发进程交叉执行的一段程序。

临界区必须互斥访问

  • 进入区:(1) 检查临界资源是否被访问,未被访问,转(2),否则转(1)。

    ? (2) 进入临界区,并设访问标志。

  • 退出区:恢复访问标志,允许其它进程进入

同步机制应遵循的准则

  • 空闲让进——有效利用
  • 忙则等待——互斥
  • 有限等待——避免“死等”
  • 让权等待——避免“忙等”

互斥锁

业务背景:3个进程,同一时刻共抢一个资源:输出平台。

分析:多个进程共抢一个资源,你要是做到结果第一位,效率第二位。你应该牺牲效率,保证结果。做到串行。

方法一:join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process
import time
import random

def task1():
print(‘task1 开始打印‘)
time.sleep(random.randint(1,3))
print(‘task1 打印完成‘)
def task2():
print(‘task2 开始打印‘)
time.sleep(random.randint(1, 3))
print(‘task2 打印完成‘)
def task3():
print(‘task3 开始打印‘)
time.sleep(random.randint(1, 3))
print(‘task3 打印完成‘)

if __name__ == ‘__main__‘:
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()

虽然上面这个版本完成了串行结果,保证了顺序,但是没有保证公平。顺序是人为写好的。我们要做到公平的去抢占资源,谁先抢到,先执行谁。

方法二:Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
# lock = Lock() # 这样的话,子进程各会拷贝一份,也就是创建一把锁,会出问题,好多把锁
def task1(lock):
print(‘task1‘) # 第一种验证方法:验证cpu遇到I/O切换了
lock.acquire()
print(‘task1 开始打印‘)
time.sleep(random.randint(1,3))
print(‘task1 打印完成‘)
lock.release()
def task2(lock):
print(‘task2‘)
lock.acquire()
print(‘task2 开始打印‘)
time.sleep(random.randint(1, 3))
print(‘task2 打印完成‘)
lock.release()
def task3(lock):
print(‘task3‘)
lock.acquire()
# lock.acquire() # 多加一次会进入死锁
print(‘task3 开始打印‘)
time.sleep(random.randint(1, 3))
print(‘task3 打印完成‘)
lock.release()
def task4():
print(‘task4 开始打印‘)
time.sleep(random.randint(1, 3))
print(‘task4 打印完成‘)


if __name__ == ‘__main__‘:
lock = Lock() # 在这里实例化是要保证同一把锁! 所以要以参数的形式传入
p1 = Process(target=task1,args=(lock,))
p2 = Process(target=task2,args=(lock,))
p3 = Process(target=task3,args=(lock,))
p4 = Process(target=task4)
p1.start()
p2.start()
p3.start()
# p4.start() # 第二种验证方法,当如果p1拿到锁,并遇到I/O阻塞的时候,cpu会进行切换,找其它可用的进程。

上面的程序中:当第一个到达的时候,假如p1开始执行,遇到阻塞的时候,cpu要切换,发现其它也需要同一把锁,所以cpu就停下来等待p1的阻塞结束。

上锁:一定要是同一把锁:上锁一次,解锁一次。

互斥锁与join区别共同点? (重点)

  • 共同点:都完成了进程之间的串行
  • 区别:join是人为控制的进程串行, 互斥锁是随机的抢占资源

情景二:模拟抢票

需求分析:买票之前需要查票,必经流程,有可能你查票的同时,100个人也在查本次车票。买票时,你要先从服务端获取票数,票数 > 0,买票,然后服务端票数减一,中间肯定有网络延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from multiprocessing import Process
import time
import random
from multiprocessing import Lock
import json
import os

def search():
time.sleep(random.random())
dic = json.load(open(‘db.json‘,encoding=‘utf-8‘))
print(f"剩余票数:{dic[‘count‘]}")

def get():
dic = json.load(open(‘db.json‘, encoding=‘utf-8‘))
time.sleep(random.randint(1,3))
if dic[‘count‘] > 0:
dic[‘count‘] -= 1
json.dump(dic,open(‘db.json‘,mode = ‘w‘ ,encoding=‘utf-8‘))
print(f‘{os.getpid()}用户 购买成功‘)
else:
print(‘没票了....‘)

def task(lock):
search()
lock.acquire()
get()
lock.release()

if __name__ == ‘__main__‘:
lock = Lock()
for i in range(5):
p = Process(target=task,args=(lock,))
p.start()
# with open(‘db.json‘,mode=‘w‘,encoding=‘utf-8‘) as f1:
# json.dump({‘count‘:3},f1) # 写入票数

多进程原则上是不能互相通信的,它们在内存级别数据隔离的。不代表硬盘的数据隔离,它们可以共同操作一个文件。

多个进程抢占同一个()资源,要想公平按照顺序,只能串行。

进程之间的通信:队列(multiprocessing.Queue)

多个进程间的通信:基于文件以及加锁的方式。

缺点:

  1. 操作文件效率低
  2. 自己加锁很麻烦,很容易出现死锁,递归锁。

进程之间的通信最好的方式是基于队列。底层是利用管道和锁。

什么是队列?

队列是存在于内存中的一个容器,最大的一个特点:队列的特性就是FIFO,完全支持先进先出的原则。

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Queue

q = Queue(3) # 可以设置元素的最大个数
q.put(‘小黑‘)
q.put({‘count‘:1})
def func():
print(‘in func‘)
q.put(func)
# q.put(666) # 当队列中的数据已经达到上限,再插入数据的时候,该进程就会夯住,阻塞,等待别的进程取出数据。
print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 当你将数据取完继续在取值的时候,该进程会夯住,阻塞,等待别的进程插入数据。

队列的maxsize q = Queue(maxsize) 数据量不宜过大。精简的重要的数据。比如:各种请求链接。

常用方法介绍:

put(self, obj, block=True, timeout=None)

  • 当超过最大限度时,默认阻塞 block=True 改成False 如果继续 put 报queue.Full异常
  • timeout 延时报错,超过设置的时间间隔后还插不进去数据,会报queue.Full异常

get(self, block=True, timeout=None)

  • 当队列为空的时候,默认阻塞;改为False后如果继续 get 报queue.Empty异常
  • timeout 延时作业,超过设置的时间间隔还取不出来数据,会报queue.Empty异常。

进程之间的通信实例

背景:抢小米手环4,预期发售10个,100个人去抢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os
from multiprocessing import Queue
from multiprocessing import Process
import queue

def task(q):
try:
q.put(f‘{os.getpid()},block=False)
except queue.Full:
pass

if __name__ == ‘__main__‘:
q = Queue(10)
for i in range(100):
p = Process(target=task,args=(q,))
p.start()
for i in range(1,11):
print(f"排名{i}的 用户{q.get()}抢到了")

利用队列进行进程之间通信:简单,方便,不用自己手动加锁。队列自带阻塞,可持续化读取数据。

进程之间的通信:管道(multiprocessing.Pipe)

管道

  • 在UNIX系统中,它连接一个读进程和一个写进程,以实现它们之间通信的共享文件,又称pipe文件。它是以文件为基础,实质是以外存来进行数据通信
  • 在windows中是一段共享内存。这段共享的内存设计采用数据流I/0的方式来访问。由一个进程读、另一个进程写
  • 类似于一根管道的两端,所以这种进程间的通信方式被称作“管道”。
1
2
Python官方文档的描述:
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

因此, Pipe仅仅适用于只有两个进程一读一写的单双工情况,也就是说信息是只向一个方向流动。例如电视、广播,看电视的人只能看,电视台是能播送电视节目。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Pipe,Process

def task(x,pipe):
_out_pipe, _in_pipe = pipe
print(‘子进程开始啦‘)
# 关闭fork过来的输入端(关闭进水口)
_in_pipe.close()
while True:
try:
# 放水
msg = _out_pipe.recv()
print(msg)
except EOFError:
# 当out_pipe接受不到输出的时候且输入被关闭的时候,会抛出EORFError,可以捕获并且退出子进程
return

if __name__ == ‘__main__‘:
out_pipe, in_pipe = Pipe() # duplex为True,创建的管道是双向的,默认为
p1 = Process(target=task, args=(100, (out_pipe, in_pipe)))
p1.start()

# 等pipe被fork 后,关闭主进程的输出端
# 这样,创建的Pipe一端连接着主进程的输入,一端连接着子进程的输出口
out_pipe.close() # 关闭出水口,开始进水
for x in range(100):
in_pipe.send(x)
print(‘xxxxxxx‘)
in_pipe.close() # 关闭进水口。 当进水口关闭后,子进程才开始使用这个管道。
p1.join()
print("主程序运行结束")

上面的代码主要用到了pipe的send()、recv()、close()方法。当pipe的输入端被关闭,且无法接收到输入的值,那么就会抛出EOFError。

新建一个Pipe(duplex)的时候,如果duplex为True,那么创建的管道是双向的;如果duplex为False,那么创建的管道是单向的。

形象的举例就是:

  • 主程序关闭出水口,放水,当水充满后,关闭进水口;
  • 子程序拿到管道。先关闭进水口,开始放水,等水流完后,关闭出水口。

生产者消费者模型

回顾:以前学到的模型,设计模式(单例模式),归一化设计,理论等待,都是交给你一个编程思路,如果以后遇到类似的情况,直接套用即可。

生产者:生产数据的进程。

消费者:对生产者生产出来的数据做进一步的处理的进程。

吃包子举例:厨师生产出包子,不可能直接塞你嘴里,他要放在中;消费者从盆中取出包子食用。

三个主体:(生产者)厨师、(容器队列)盆、(消费者)人。

为什么夹杂这个容器?

(容器)盆起到一个缓冲区的作用,也起到了解耦的作用(只有生产者和消费者话它们是强耦合的)。

平衡了生产力和消费力。

生产者消费者模型多用于并发

方法一:利用队列实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process
from multiprocessing import Queue
import time
import random

def producer(q):
for i in range(1,6):
time.sleep(random.randint(1,3))
res = f‘{i}号包子‘
q.put(res)
print(f‘33[0;32m 生产者生产了{res}33[0m‘)

def consumer(q):
while 1:
try:
time.sleep(random.randint(1,3))
ret = q.get(timeout= 5)
print(f‘消费者吃了{ret})
except Exception:
return

if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()

生产者消费者模型:

合理的去调控多个进行去生产数据以及提取数据,中间有个必不可少的环节容器队列。

本质上:利用队列进行通信。

方法二:利用管道实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process,Pipe

def producer(out_pipe, in_pipe):
out_pipe.close()
for i in range(1,101):
s = f"第{i}号包子"
print(f‘生产者生产了{s})
in_pipe.send(s)
in_pipe.close()

def consumer(out_pipe, in_pipe):
in_pipe.close()
# while True:
# try:
# s = out_pipe.recv()
# print(f"消费者吃了{s}")
# except EOFError: # 只有在进水口关闭的时候才能引发异常
# print(‘xxx‘)
# return
for i in range(100):
s = out_pipe.recv()
print(f"消费者吃了{s}")
out_pipe.close()

if __name__ == ‘__main__‘:
out_pipe, in_pipe = Pipe()
pro = Process(target=producer,args=(out_pipe, in_pipe))
con = Process(target=consumer,args=(out_pipe, in_pipe))
pro.start()
con.start()
con.join()
print(‘主进程结束‘)

以上是关于Python并发编程之进程2的主要内容,如果未能解决你的问题,请参考以下文章

Python并发编程之多进程

python并发编程之多进程

python并发编程之多线程基础知识点

python并发编程之多线程守护系列互斥锁生产者消费者模型

Python并发编程之进程

python之并发编程