1、开启进程的两种方式
方式一
def task(name):
print(" %s start..." % name)
if __name__ == ‘__main__‘:
p = Process(target=task, args=("sb",))
p.start()
方式二
class Piao(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(‘%s start..‘ % self.name)
if __name__ == ‘__main__‘:
p1 = Piao("sb")
p1.start()
terminate和is_alive
from multiprocessing import Process
import time
def task(name):
time.sleep(1)
print("%s done.." % name)
if __name__ == ‘__main__‘:
p1 = Process(target=task, args=("sb",))
p1.start()
p1.terminate() # 发送关闭进程命令
print(p1.is_alive()) # 查看进程是否活动
# True
print("主")
time.sleep(1)
name与pid
from multiprocessing import Process
import os
def task(name):
print("%s start..." % name)
if __name__ == ‘__main__‘:
p = Process(target=task, args=("sb",), name="子进程1") # 可以用关键参数来指定进程名
p.start()
print(p.name, p.pid, os.getppid()) # p.ppid 报错
print(os.getpid()) # p.pid==os.getpid()
守护进程
一、守护进程在主进程执行结束终止
二、守护进程内无法开启子进程。
from multiprocessing import Process
import time
def foo(): # 不执行
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == ‘__main__‘:
p1 = Process(target=foo)
p2 = Process(target=bar)
p1.daemon = True
p1.start()
p2.start()
print("main-------")
互斥锁
前戏:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或者打开同一个打印终端,共享带来竞争。
互斥锁:互斥锁的意思就是互相排斥。保证了数据安全不错乱。
from multiprocessing import Process
from multiprocessing import Lock
import time
def task(name, mutex):
mutex.acquire()
print("%s 1" % name)
time.sleep(1)
print("%s 2" % name)
mutex.release()
if __name__ == ‘__main__‘:
mutex = Lock()
for i in range(2):
p = Process(target=task, args=("进程%s" % i, mutex))
p.start()
互斥锁和join的区别
互斥锁是让加锁部分变成串性,join是让整段代码都变成串行。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__:JasonLIN
from multiprocessing import Process, Lock
import time
import json
def search(name):
time.sleep(1)
data = json.load(open("data.txt", "r"))
print("<%s>剩余票数[%s]" % (name, data["count"]))
def buy(name):
data = json.load(open("data.txt", "r"))
if int(data["count"]) > 0:
data["count"] -= 1
time.sleep(2)
json.dump(data, open("data.txt", "w"))
print("%s 购票成功" % name)
def task(name, mutex):
search(name)
mutex.acquire()
buy(name)
mutex.release()
if __name__ == ‘__main__‘:
mutex = Lock()
for i in range(10):
p = Process(target=task, args=("路人%s" % i, mutex))
p.start()
join方法,search和buy都变成串行,效率更低。
from multiprocessing import Process
import time
import json
def search(name):
data = json.load(open("data.txt", "r"))
time.sleep(1)
print("<%s>剩余票数[%s]" % (name, data["count"]))
def buy(name):
data = json.load(open("data.txt", "r"))
if int(data["count"]) > 0:
data["count"] -= 1
time.sleep(2)
json.dump(data, open("data.txt", "w"))
print("%s 购票成功" % name)
def task(name):
search(name)
buy(name)
if __name__ == ‘__main__‘:
for i in range(10):
p = Process(target=task, args=("路人%s" % i,))
p.start()
p.join()
队列
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
创建队列的类(底层就是以管道和锁定的方式实现)
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
Queue([maxsize]):是队列中允许最大项数,省略则无大小限制
q.put方法用以插入数据到队列中。
q.get方法可以从队列读取并且删除一个元素。
代码实例
from multiprocessing import Process,Queue
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #满了
# q.put(4) #再放就阻塞住了
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
生产者消费者模型
为什么要使用生产者消费者模型?
生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者和消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯。
这个阻塞队列就是用来给生产者和消费者解耦的
from multiprocessing import Process, Queue
import time
import random
def producer(q):
for i in range(3):
res = "包子%s" % i
time.sleep(0.5)
print("生产了%s" % res)
q.put(res)
def consume(q):
while True:
res = q.get()
if not res:
break
time.sleep(random.randint(1, 3))
print("吃了%s" % res)
if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=producer, args=(q,))
p3 = Process(target=producer, args=(q,))
c1 = Process(target=consume, args=(q,))
c2 = Process(target=consume, args=(q,))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print("zhu")from multiprocessing import Process, Queue
import time
import random
def producer(q):
for i in range(3):
res = "包子%s" % i
time.sleep(0.5)
print("生产了%s" % res)
q.put(res)
def consume(q):
while True:
res = q.get()
if not res:
break
time.sleep(random.randint(1, 3))
print("吃了%s" % res)
if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=producer, args=(q,))
p3 = Process(target=producer, args=(q,))
c1 = Process(target=consume, args=(q,))
c2 = Process(target=consume, args=(q,))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print("zhu")
JoinableQueue([maxsize])
from multiprocessing import Process, JoinableQueue
import time
import random
def producer(q):
for i in range(3):
res = "包子%s" % i
time.sleep(0.5)
print("生产了%s" % res)
q.put(res)
q.join() # 等消费者把所有数据取走之后,生产者才结束
def consume(q):
while True:
res = q.get()
if not res:
break
time.sleep(random.randint(1, 3))
print("吃了%s" % res)
q.task_done() # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕
if __name__ == ‘__main__‘:
q = JoinableQueue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=producer, args=(q,))
p3 = Process(target=producer, args=(q,))
c1 = Process(target=consume, args=(q,))
c2 = Process(target=consume, args=(q,))
c1.daemon = True
c2.daemon = True
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
print("zhu")