守护进程互斥锁ipc机制生产者消费者模型

Posted kingyanan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了守护进程互斥锁ipc机制生产者消费者模型相关的知识,希望对你有一定的参考价值。

一、补充:
from multiprocessing import Process
import time, os

def task():
print(‘%s is running‘ % os.getpid())
time.sleep(3)

if __name__ == ‘__main__‘:
p = Process(target=task)
p.start()
p.join() # 等待进程p结束后,join函数内部会发送系统调用wait,去告诉操作系统回收掉进程p的id号

print(p.pid) # ???此时能否看到子进程p的id号
print(‘主‘)

答案:可以
分析:
p.join()是像操作系统发送请求,告知操作系统p的id号不需要再占用了,回收就可以,
此时在父进程内还可以看到p.pid,但此时的p.pid是一个无意义的id号,因为操作系统已经将该编号回收


二、守护进程

1、什么守护进程
守护进程其实就是一个‘子进程’,守护进程会伴随主进程的代码运行完毕后而死掉

2、为何用守护进程
关键字就两个:
什么时候需要开启子进程?
当父进程需要将一个任务并发出去执行,需要将该任务放到一个子进程里

什么时候需要将子进程设置为守护进程?
当该子进程内的代码在父进程代码运行完毕后就没有存在的意义了,就应该
将该子进程设置为守护进程,会在父进程代码结束后死掉


from multiprocessing import Process
import time,os

def task(name):
print(‘%s is running‘ %name)
time.sleep(3)

if __name__ == ‘__main__‘:
p1=Process(target=task,args=(‘守护进程‘,))
p2=Process(target=task,args=(‘正常的子进程‘,))

p1.daemon = True # 一定要放到p.start()之前
p1.start()
p2.start()

print(‘主‘)


#主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process

import time
def foo():
print(123)
time.sleep(1)
print("end123")

def bar():
print(456)
time.sleep(3)
print("end456")

if __name__ == ‘__main__‘:
p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
print("main-------")


一般电脑
‘‘‘
main-------
456
enn456
‘‘‘
电脑性能稍微快
‘‘‘
main-------
123
456
enn456
‘‘‘
电脑性能特别快
‘‘‘
123
main-------
456
end456
‘‘‘

永远看不到 end123



三、互斥锁
互斥锁:可以将要执行任务的部分代码(只涉及到修改共享数据的代码)变成串行
牺牲了效率,但保证数据安全。

互斥锁 vs p.join()
1.互斥锁是局部串行
2.p.join()是要执行任务的所有代码整体串行

#文件db的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别

from multiprocessing import Process,Lock
import json
import os
import time
import random

def check():
time.sleep(1) # 模拟网路延迟
with open(‘db.txt‘,‘rt‘,encoding=‘utf-8‘) as f:
dic=json.load(f)
print(‘%s 查看到剩余票数 [%s]‘ %(os.getpid(),dic[‘count‘]))

def get():
with open(‘db.txt‘,‘rt‘,encoding=‘utf-8‘) as f:
dic=json.load(f)
time.sleep(2)
if dic[‘count‘] > 0:
# 有票
dic[‘count‘]-=1
time.sleep(random.randint(1,3))
with open(‘db.txt‘,‘wt‘,encoding=‘utf-8‘) as f:
json.dump(dic,f)
print(‘%s 购票成功‘ %os.getpid())
else:
print(‘%s 没有余票‘ %os.getpid())


def task(mutex):
# 查票
check()

#购票
mutex.acquire() # 互斥锁不能连续的acquire,必须是release以后才能重新acquire
get()
mutex.release()


# with mutex:
# get()

if __name__ == ‘__main__‘:
mutex=Lock()
for i in range(10):
p=Process(target=task,args=(mutex,))
p.start()
# p.join()


四、IPC机制(*****)
IPC:进程间通信,有两种实现方式
1、pipe:
2、queue:pipe+锁

q=Queue() #先进先出
注意:
1、队列占用的是内存空间
2、不应该往队列中放大数据,应该只存放数据量较小的消息

掌握的
q.put(‘first‘)
q.put({‘k‘:‘sencond‘})
q.put([‘third‘,])
# q.put(4)

print(q.get())
print(q.get())
print(q.get())
print(q.get())

了解的
q=Queue(3) #先进先出
q.put(‘first‘,block=True,timeout=3)
q.put({‘k‘:‘second‘},block=True,timeout=3)
q.put([‘third‘,],block=True,timeout=3)
print(‘===>‘)
# q.put(4,block=True,timeout=3)

print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))
print(q.get(block=True,timeout=3))

block=True时才能与timeout一起用



q=Queue(3) #先进先出
q.put(‘first‘,block=False,)
q.put({‘k‘:‘sencond‘},block=False,)
q.put([‘third‘,],block=False,)
print(‘===>‘)
# q.put(4,block=False,) # block=False 队列满了直接抛出异常,不会阻塞

通用方式:
for i in range(10):
q.put(i,block=False)

print(q.get(block=False))
print(q.get(block=False))
print(q.get(block=False))
print(‘get over‘)
# print(q.get(block=False))



q=Queue(3) #先进先出

q.put_nowait(‘first‘) #相当于q.put(‘first‘,block=False,)
q.put_nowait(2)
q.put_nowait(3)
# q.put_nowait(4)

print(q.get_nowait()) #相当于q.get(block=False)
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())



五、生产者消费者模型(******)

1 什么是生产者消费者模型
生产者:比喻的是程序中负责产生数据的任务
消费者:比喻的是程序中负责处理数据的任务

生产者->共享的介质(队列)<-消费者

2 为何用
实现了生产者与消费者的解耦和,生产者可以不停地生产,消费者也可以不停地消费
从而平衡了生产者的生产能力与消费者消费能力,提升了程序整体运行的效率

什么时候用?
当我们的程序中存在明显的两类任务,一类负责产生数据,另外一类负责处理数据
此时就应该考虑使用生产者消费者模型来提升程序的效率


简单实现:
from multiprocessing import Queue,Process
import time,os,random

def producer(q):
for i in range(10):
res=‘包子%s‘%i
time.sleep(random.randint(1,3))
# 往队列里丢
q.put(res)
print(‘33[45m%s生产了%s33[0m‘%(os.getpid(),res))


def consumer(q):
while True:
#从队列里取走
res=q.get()
if res==None:break
time.sleep(random.randint(1,3))
print(‘33[43m%s 吃了%s33[0m‘%(os.getpid(),res))

if __name__ == ‘__main__‘:
q=Queue()
# 生产者们
p1=Process(target=producer,args=(q,))
# 消费者们
c1=Process(target=consumer,args=(q,))

p1.start()
c1.start()

print(‘主‘)

问题:程序没有结束,因为p1结束了而c1没有结束。


完全实现:
from multiprocessing import Queue,Process
import time,random

def producer(name,food,q):
for i in range(3):
res=‘%s%s‘%(food,i)
time.sleep(random.randint(1,3))
# 往队列里丢
q.put(res)
print(‘33[45m%s 生产了 %s33[0m‘%(name,res))
# q.put(None) 不应该在这里放none,如果有消费者取走none会结束了(不应该结束,后面还有数据)

def consumer(name,q):
while True:
#从队列里取走
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(‘33[41m%s 吃了 %s33[0m‘%(name,res))

if __name__ == ‘__main__‘:
q=Queue()
# 生产者们
p1=Process(target=producer,args=(‘egon‘,‘包子‘,q))
p2=Process(target=producer,args=(‘wxx‘,‘饺子‘,q))
p3=Process(target=producer,args=(‘lxx‘,‘混沌‘,q))
# 消费者们
c1=Process(target=consumer,args=(‘alex‘,q))
c2=Process(target=consumer,args=(‘per‘,q))

p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

p1.join()
p2.join()
p3.join()
# 在p1p2p3都结束后,才应该往队列里放结束信号,有几个消费者就应该放几个None
q.put(None)
q.put(None)

终极版:


from multiprocessing import JoinableQueue,Process
import time
import os
import random

def producer(name,food,q):
for i in range(3):
res=‘%s%s‘ %(food,i)
time.sleep(random.randint(1,3))
# 往队列里丢
q.put(res)
print(‘33[45m%s 生产了 %s33[0m‘ %(name,res))
# q.put(None)

def consumer(name,q):
while True:
#从队列里取走
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(‘33[46m%s 吃了 %s33[0m‘ %(name,res))
q.task_done() #发送一次信号,证明一个数据已经被取走了

if __name__ == ‘__main__‘:
q=JoinableQueue()
# 生产者们
p1=Process(target=producer,args=(‘egon‘,‘包子‘,q,))
p2=Process(target=producer,args=(‘八戒‘,‘泔水‘,q,))
p3=Process(target=producer,args=(‘猴子‘,‘翔‘,q,))
# 消费者们
c1=Process(target=consumer,args=(‘周‘,q,))
c2=Process(target=consumer,args=(‘吴‘,q,))
c1.daemon=True
c2.daemon=True 守护进程(一定要放在start上面)

p1.start()
p2.start()
p3.start()
c1.start()
c2.start()

p1.join()
p2.join()
p3.join()

q.join() #等待队列被取干净
# q.join() 结束意味着:主进程的代码运行完毕--->(生产者运行完毕)+队列中的数据也被取干净了->消费者没有存在的意义

以上是关于守护进程互斥锁ipc机制生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

开启子进程的两种方式,孤儿进程与僵尸进程,守护进程,互斥锁,IPC机制,生产者与消费者模型

守护进程,互斥锁,IPC,生产者与消费者模型

20181229(守护进程,互斥锁,IPC,生产者和消费者模型)

守护进程,互斥锁,IPC,队列,生产者与消费者模型

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

python 39