python学习_多进程
Posted laura-l
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python学习_多进程相关的知识,希望对你有一定的参考价值。
1、基础知识:
计算机的硬件组成:
主板 固化(寄存器,是直接和cpu进行交互的硬件)
cpu 中央处理器:计算(数字计算和逻辑计算) 和控制(控制所有的硬件协调工作)
存储 硬盘 内存
输入设备 键盘 鼠标 话筒
输出设备 显示器 音响 打印机
早期的计算机是以计算为核心的,现在的计算机是以存储为核心的
操作系统是一个软件 是一个能直接操作硬件的一个软件
操作系统的目标:让用户使用更加轻松,高可用,低耦合,封装了所有硬件的接口,使用户更方便的使用,对于计算机内的所有资源,进行一个合理的调度和分配
进程相关基础知识:
进程:正在执行的程序,是程序执行过程中的相关指令,数据集等的集合,也可以叫做程序的一次执行过程,是一个动态的概念
进程的组成:代码段 数据段 PCB:进程控制块
进程的三大基本状态:
就绪状态:已经获得了运行所需要的所有资源,除了cpu
执行状态:已经获得了所有资源,包括cpu,处于正在执行的状态
阻塞状态:因为各种原因,进程放弃了cpu,导致进程无法继续执行,此时进程处于内存中,继续等待获取cpu
一个特殊状态:挂起状态,因为各种原因,进程放弃了cpu,导致进程无法继续执行,此时进程被踢出内存
2、进程
multiprocessing python的内置模块,用于多进程编程 Process from multiprocessing import Process
并行:指两件或者多事情,在同一个时间点开始执行
并发:指两件或者多件事情,在同一个时间间隔内同时执行
同步:某一个任务的执行必须依赖另一个任务的返回结果
异步;某一个任务的执行不需要依赖另一个任务的返回,只需要告诉另一个任务一声
阻塞:程序因为类似IO等待,等待时间等无法继续执行
非阻塞:程序遇到类似IO操作时候,不在阻塞等待,如果没有及时处理IO,就报错或者跳过
进程的方法或者属性:
os.getpid() 获取当前进程的pid os.getppid()获取当前进程的父进程id
start() 开启一个子进程
join()异步变同步,让父进程等待子进程执行结束,再继续执行(当主进程执行到这条语句的时候,主进程阻塞,等待子进程执行完毕,主进程继续执行),join必须放在start之后
is_alive() 判断进程是否活着
terminate() 杀死进程
属性;
name: 子进程的名字
pid:子进程的pid
deamon:设置进程为守护进程,给True代表为守护进程,默认为false,不是守护进程
守护进程:p.daemon=True
随着父进程的代码执行完毕就结束(敲黑板,划重点!!是代码执行完毕--主进程不会阻塞等待)
守护进程不能创建子进程
守护进程必须在进程start之前设置
IPC:进程间通信
锁机制:为了多进程通信的时候,保护数据的安全性
from multiprocessing import Lock
l = lock()
l.acquire() 获得锁(此时其他进程不可以访问锁住的资源)
l.release() 释放锁(其他进程可以访问)
信号机制:
sem = Semaphore(n)
n:初始化的时候一把锁配几把钥匙,int
l.acquire()
l.release()
信号量机制比锁机制多了个计数器,这个计数器用来记录当前剩余几把锁,计数器为0,表达当前没有钥匙,acquire()处于阻塞状态,acquire()一次,计数器内部减1,release一次,计数器加一;
事件机制:
e = Event()
初始为false,阻塞状态
e.set() 设置is_set()为True,代表非阻塞状态
e.clear() 设置is_set()为false,代表阻塞状态
e.wait()判断is_set的值,True为非阻塞。fase为阻塞
e.is_set() 标志
3、生产者消费者模型
简要介绍:主要是用来解耦,借助队列来实现生产者消费者模型
栈:先进后出
队列:先进先出
import queue 不能进行多进程之间的数据传输
from multiprocessing import Queue 借助Queue解决生产者消费者模型
队列是安全的
q = Queue(num---队列的最大长度)
q.get() 阻塞等待获取数据,如果有数据直接获取,没有则阻塞等待
q.put() 阻塞 如果可以继续往队列中放数据,就直接放,不能放则阻塞等待
q.get_nowait() 不阻塞,如果有数据直接获取,没有数据就报错
q.put_nowait() 不阻塞,如果可以继续往队列中放数据,就直接放,不能就报错
from multiprocessing import JoinableQueue 可连接的队列
继承Queue 可以使用queue的方法
增加的方法:
q.join() 用户生产者接收消费者的返回结果,接收全部生产的数量,以便知道什么时候队列里的数据被消费完了
q.task_done() 每消费一个数据,就返回一个表示返回结果,生产者就您呢个获得当前消费者消费了多少个数据,每消费队列里的一个数据,就给join返回一个表示
管道:
from multiprocessing import Pipe
con1, con2 = Pipe()
管道是不安全的
管道是用于多线程之间通信的一种方式
单进程中:
con1发则con2收,con2收则con1发
多进程中:
父进程con1发,子进程的con2收
管道中错误EOFError 父进程中如果关闭发送端,子进程还在继续接收,就回导致EOFError
4、进程池
一个池子,里面有固定数量的进程,且处在待命状态,一旦有任务来,马上就有进程去处理
开启进程需要操作系统消耗大量的事件去管理它,大量的事件让cpu去调度它
进程池会帮助程序员去管理池中的进程
from multiprocessing import Pool
p = Pool(os.cpu_count() + 1)
进程池的三个方法:
map(func, iterable)
func: 进程池中进程执行的任务函数
iterable:可迭代对象,是把可迭代对象中的每一个元素传给任务函数当参数
from multiprocessing import Pool import os def func(num): num += 1 print(num) return num if __name__ == ‘__main__‘: p = Pool(os.cpu_count() + 1) print(os.cpu_count()) res = p.map(func, [i for i in range(20)]) . # i作为参数传入func中 p.close() # 表示不能再向进程池中添加任务 p.join() # 表示等待进程池中所有任务执行完毕 print(type(res))
apply(func,arg=()) 同步的执行,即池中的进程一个个的去执行任务
func:进程池中进程执行的任务函数
args:可迭代对象的参数,是传给任务函数的参数
同步执行任务,不需要close和join, 进程池中所有的进程都是普通进程(主进程需要等待其结束)
from multiprocessing import Pool import requests import time def func(url): res = requests.get(url) if res.status_code == 200: return ‘ok‘ if __name__ == ‘__main__‘: p = Pool(5) l = [‘https://www.baidu.com‘, ‘http://www.jd.com‘, ‘http://www.taobao.com‘, ‘http://www.mi.com‘, ‘http://www.cnblogs.com‘, ‘https://www.bilibili.com‘, ] start = time.time() for i in l: p.apply(func, args=(i,)) #即使有n个线程也是一个一个的去执行 print(time.time() - start) start = time.time() for i in l: p.apply_async(func, args=(i, )) p.close() p.join() print(time.time() - start)
apply_async(func,args=(), callback=None) 异步:池中的进程一次性去执行任务
func:进程池中进程执行的任务函数
args:可迭代对象的参数,是传给任务函数的参数
callback: 回调函数 当进程池中有进程处理完任务来,返回的结果可以交给回调函数,由回调函数进程进一步的处理,这是只有异步才有的
异步处理任务,需要close和join
异步处理任务时,进程池中所有的进程都是守护进程
回调函数:
进程的任务函数的返回值,被当成回调函数的形参接收到,一次进一步的处理操作
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果给回调函数
from multiprocessing import Pool import requests import time import os def func(url): res = requests.get(url) print("子进程的pid:%s, 父进程的pid:%s" % (os.getpid(), os.getppid())) if res.status_code == 200: return url def cal_back(sta): url = sta print("回调函数的pid:%s" % os.getpid()) with open(‘content.txt‘, ‘a+‘) as f: f.write(url + "\\n") if __name__ == ‘__main__‘: p = Pool(5) l = [‘https://www.baidu.com‘, ‘http://www.jd.com‘, ‘http://www.taobao.com‘, ‘http://www.mi.com‘, ‘http://www.cnblogs.com‘, ‘https://www.bilibili.com‘, ] print("主进程pid:%s" % os.getpid()) for i in l: p.apply_async(func, args=(i,), callback=cal_back) p.close() p.join()
进程和进程池对比
from multiprocessing import Process, Pool import time def func(num): num += 1 # print(num) if __name__ == ‘__main__‘: p = Pool(5) start = time.time() p.map(func, [i for i in range(1000)]) p.close() # 指不能再向进程池中添加任务 p.join() # 等待进程池中的所有任务执行完毕 print(time.time() - start) p_l = [] start = time.time() for i in range(1000): p = Process(target=func, args=(i,)) p.start() p_l.append(p) [i.join() for i in p_l] print(time.time() - start)
5、生产者消费者模型实例
队列实现生产者消费者模型
from multiprocessing import Queue, Process import time def producer(q, name): for i in range(20): q.put(name) print("生产第%s个%s" % (i, name)) # q.put(None) def consumer(q, name, color): while 1: info = q.get() if info: print("%s %s拿走来%s\\033[0m" % (color, name, info)) else: break if __name__ == ‘__main__‘: q = Queue(10) p = Process(target=producer, args=(q, ‘number one‘)) p1 = Process(target=producer, args=(q, ‘number two‘)) p2 = Process(target=producer, args=(q, ‘number three‘)) c1 = Process(target=consumer, args=(q, ‘alex‘, ‘\\033[31m‘)) c2 = Process(target=consumer, args=(q, ‘wusir‘, ‘\\033[32m‘)) p_l = [p, p1, p2, c2, c1] [i.start() for i in p_l] p.join() p1.join() p2.join() q.put(None) q.put(None) # 设置标志 表示没有数据了,生产者不再生产数据
joinableQueue实现生产者消费者进程
from multiprocessing import JoinableQueue, Process def producer(q, name): for i in range(20): q.put(name) print("生产第%s个%s" % (i, name)) q.join() 生产者进程等待消费者进程消费完成 def consumer(q, name): while 1: q.get() print("%s 拿走了一个" % name) q.task_done() if __name__ == ‘__main__‘: q = JoinableQueue(10) p1 = Process(target=producer, args=(q, ‘one‘)) c1 = Process(target=consumer, args=(q, ‘alex‘)) c1.daemon = True # 设置守护进程 p1.start() c1.start() p1.join() # 主进程等待生产者进程 # 主进程等待生产者进程结束 # 程序有3个进程,主进程和生产者进程和消费者进程。 当主进程执行到35行代码时,主进程会等待生产进程结束 # 而生产进程中(第26行)会等待消费者进程把所有数据消费完,生产者进程才结束。 # 现在的状态就是 主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据 # 所以,把消费者设置为守护进程。 当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完 # 此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。 整个程序也就能正常结束了。
6、其他相关知识
进程间的内存共享
from multiprocessing import Manager, Value
m = manager()
num = m.dict() num = m.list([])
IPC:管道 队列(锁 信号量 事件)
多个进程之间不能共享内存,不能修改全局变量
from multiprocessing import value
num = Value("i"--数据类型, num--数据)
num.value() 值
Manager模块:多进程间共享数据
m = Manager()
num = m.list([1,2,3])
以上是关于python学习_多进程的主要内容,如果未能解决你的问题,请参考以下文章