35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型
Posted komorebi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型相关的知识,希望对你有一定的参考价值。
from multiprocessing import Processimport time
def task():
print(‘老了。。。。‘)
time.sleep(2)
print(‘睡了一会。。‘)
if __name__ == ‘__main__‘:
print(‘长大了。。。。‘)
p=Process(target=task)
p.daemon=True
#一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.start()
time.sleep(2)
print(‘关闭了。。。。‘)
p.terminate()
使用场景:父进程交给子进程一个任务,任务还没有完成父进程就结束了,此时子进程就没意义了。
互斥锁
互斥锁:
互相排斥的锁(如果这个资源已经被锁了,其他进程就无法使用了)
需要强调的是: 锁 并不是真的把资源锁起来了,只是在代码层面限制你的代码不能执行。
使用场景:
并发将带来资源的竞争问题,
当多个进程同时要操作同一个资源时,将会导致数据错乱的问题。
解决方案1:
? 加join,
例子:
from multiprocessing import Process
import time
def task1():
print(‘你好,阿森。。。‘)
time.sleep(3)
print(‘吃饭‘)
time.sleep(3)
print(‘下雨‘)
def task2():
print(‘你好,阿三。。。‘)
time.sleep(3)
print(‘吃面‘)
time.sleep(3)
print(‘下雪‘)
def task3():
print(‘你好,阿四。。。‘)
time.sleep(3)
print(‘吃米‘)
time.sleep(3)
print(‘下冰雹‘)
if __name__ == ‘__main__‘:
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
小知识点:
andom.randint(a,b)用于生成一个指定范围内的整数。
其中参数a是下限,参数b是上限,生成的随机数n: a <= n <= b。
? 弊端: 1.把原本并发的任务变成了穿行,避免了数据错乱问题,但是效率降低了,这样就没必要开子进程了。
? 2.原本多个进程之间是公平竞争,join执行的顺序就定死了,这是不合理的。
解决方案2:
? 就是给公共资源加 锁----互斥锁。
例子:
from multiprocessing import Process,Lock
import time,random
def task1(lock):
# 上锁
lock.acquire() #就等同于一个if判断
print(‘你好,阿森。。。‘)
time.sleep(random.randint(0,3))
print(‘吃饭‘)
time.sleep(random.randint(0, 3))
print(‘下雨‘)
#解锁
lock.release()
def task2(lock):
lock.acquire()
print(‘你好,阿三。。。‘)
time.sleep(random.randint(0, 3))
print(‘吃面‘)
time.sleep(random.randint(0, 3))
print(‘下雪‘)
lock.release()
def task3(lock):
lock.acquire()
print(‘你好,阿四。。。‘)
time.sleep(random.randint(0, 3))
print(‘吃米‘)
time.sleep(random.randint(0, 3))
print(‘下冰雹‘)
lock.release()
if __name__ == ‘__main__‘:
lock=Lock()
p1=Process(target=task1,args=(lock,))
p2=Process(target=task2,args=(lock,))
p3=Process(target=task3,args=(lock,))
p1.start()
p2.start()
p3.start()
# 注意1: 不要对同一把执行多出acquire 会锁死导致程序无法执行 一次acquire必须对应一次release
from multiprocessing import Lock
l=Lock()
l.acquire()
print(‘抢到了‘)
l.release()
l.acquire()
print(‘接着抢‘)
# 注意2:想要保住数据安全,必须保住所有进程使用同一把锁
锁和join的区别:
1. join是固定了执行顺序,会造成父进程等待子进程完成后完成
? 锁是公平竞争谁先抢到谁先执行,父进程可以做其他事情、
2. join是把进程的任务全部串行
? 锁可以锁任意代码 ,一行也可以 可以自己调整粒度
粒度:
粒度越大意味着锁住的代码越多 效率越低
粒度越小意味着锁住的代码越少 效率越高
mysql 中不同隔离级别 其实就是不同的粒度
小练习----------抢票
"""
过程:
1.查看余票,
2.有就买,无失败
os.getpid()获取当前进程id
#文件db的内容为:"count":1
#注意一定要用双引号,不然json无法识别
"""
from multiprocessing import Process, Lock
import json, random, time, os
def check_ticket():
with open(‘ticket.json‘, ‘rt‘, encoding=‘utf-8‘)as a:
data = json.load(a)
time.sleep(random.randint(0, 3))
print(‘%s正在查票,票还有%s‘ % (os.getpid(), data["count"]))
def buy_ticket():
with open(‘ticket.json‘, ‘rt‘, encoding=‘utf-8‘)as a:
data = json.load(a)
if data["count"] > 0:
data["count"] -= 1
# 模拟延迟
time.sleep(random.randint(0, 3))
with open(‘ticket.json‘, ‘wt‘, encoding=‘utf-8‘)as a:
json.dump(data, a)
print(‘%s恭喜你抢票成功‘ % os.getpid())
else:
print(‘%s抢票失败,开个加速包试一试‘ % os.getpid())
def task(lock):
check_ticket()
lock.acquire()
buy_ticket()
lock.release()
if __name__ == ‘__main__‘:
lock = Lock()
# 三个人抢票
for i in range(10):
p = Process(target=task, args=(lock,))
p.start()
IPC
进程间通讯
通讯指的就是交换数据
? 进程之间内存是相互隔离的,当一个进程想要把数据给另外一个进程,就需要考虑IPC
方式:
? 管道: 只能单向通讯,数据都是二进制
? 文件: 在硬盘上创建共享文件
? 缺点:速度慢
? 优点:数据量几乎没有限制
? socket:
? 编程复杂度较高
? 共享内存:必须由操作系统来分配 要掌握的方式*****
? 优点: 速度快
? 缺点: 数据量不能太大
共享内存的方式
? Manager提供很多数据结构 list dict等等
? Manager所创建出来的数据结构,具备进程间共享的特点
from multiprocessing import Process, Manager, Lock
import time
# 传参给父进程
def task(data, l):
# 上锁
l.acquire()
num = data[‘num‘]
time.sleep(0.1)
# 时间过长,子进程就会死掉
data[‘num‘] = num - 1
# 解锁
l.release()
if __name__ == ‘__main__‘:
# 让Manager开启一个共享的字典
m = Manager()
data = m.dict(‘num‘: 10)
l = Lock()
for i in range(10):
p = Process(target=task, args=(data, l))
p.start() # 创建子进程
time.sleep(2)
print(data)
Queue队列 帮我们处理了锁的问题
队列是一种特殊的数据结构,先存储的先取出 就像排队 先进先出
? 相反的是堆栈,先存储的后取出, 就像衣柜 桶装薯片 先进后出
? 扩展:
? 函数嵌套调用时 执行顺序是先进后出 也称之为函数栈
? 调用 函数时 函数入栈 函数结束就出栈
from multiprocessing import Queue
# 创建队列创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
# maxsize是队列中允许最大项数,省略则无大小限制。
# Queue(maxsize):
q=Queue(3)
# 存储元素
q.put(‘123‘)
q.put(‘qwe‘)
q.put(‘利物浦‘)
print(q.get())
q.put(‘句柄hi‘)
# 如果容量已经满了,在调用put时将进入阻塞状态 直到有人从队列中拿走数据有空位置 才会继续执行
#取出元素
print(q.get())
print(q.get())
print(q.get())
q.get(block=True,timeout=2)
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. 看一下:
3 q.get_nowait():同q.get(False) 4 q.put_nowait():同q.put(False) 5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 7 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
8 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞 9 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
10. q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
生产者消费者模型
? 模型 就是解决某个问题套路
? 产生数据的一方称之为生产者
? 处理数据的一方称之为消费者
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
解决的方案:
? 将双方分开来.一专门负责生成,一方专门负责处理
? 这样一来数据就不能直接交互了 双方需要一个共同的容器
? 生产者完成后放入容器,消费者从容器中取出数据
? 这样就解决了双发能力不平衡的问题,做的快的一方可以继续做,不需要等待另一方
基于队列实现生产者消费者模型
from multiprocessing import Process, Queue
import time, random
def make_rose(q):
for i in range(6):
print(‘%s个青椒炒鸡蛋制作完成‘ % i)
rose = ‘%s个青椒炒鸡蛋‘ % i
# 将生成完成的数据放入队列中
q.put(rose)
def eat(q):
for i in range(6):
rose = q.get()
time.sleep(random.randint(0, 3))
print(rose, ‘吃完了‘)
if __name__ == ‘__main__‘:
q = Queue()
make_p = Process(target=make_rose, args=(q,))
eat_p = Process(target=eat, args=(q,))
make_p.start()
eat_p.start()
#引入生产者消费者模型为了解决的问题是: 平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度 #如何实现: 生产者<-->队列<——>消费者 #生产者消费者模型实现类程序的解耦和
以上是关于35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章
20181229(守护进程,互斥锁,IPC,生产者和消费者模型)