Python多进程,多线程和异步实例
Posted 汤米先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多进程,多线程和异步实例相关的知识,希望对你有一定的参考价值。
文章目录
前言
分享一些多进程,多线程和异步实例,顺便写个笔记记录一下一、多进程
1. 进程间通信使用Queue队列
Queue的特点:先进先出,后进后出
Queue的使用:可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序
Queue.qsize() :返回当前队列包含的消息数据
Queue.empty() :如果队列为空,返回True,反之False
Queue.full() :如果队列满了,返回True,反之False
Queue.get([block[,timeout]]) :获取队列中的一条消息,然后将从队列中移除,block默认为True
Queue.put([item,[block[,timeout]]]) :将item消息写入队列中,block默认是True
from multiprocessing import Queue
q=Queue(2)
q.qsize() #0
q.put('a')
q.put('b')
#q.put('c') 放不下了,因为队列中只能是2个 【阻塞】
q.get() #a
q.get() #b
#q.get() 取不了,因为队列中只能是2个 【阻塞】
2. 多进程中的通信【一个往Queue里写,一个从Queue里读】
from multiprocessing import Process,Queue
import os,time,random
def write(q): #写数据进程执行的代码
for item in ['A','B','C']:
print('Put {} to queue...'.format(item))
q.put(item)
time.sleep(random.randint(1,10))
def read(q): #读数据进程执行的代码
while True:
if not q.empty():
item=q.get(True)
print('Get {} from queue'.format(item))
time.sleep(random.randint(1,10))
else:
break
if __name__ == '__main__':
q=Queue() #父进程创建Queue,并传给各个子进程
pw=Process(target=write,args=(q,))
pr=Process(target=read,args=(q,))
pw.start() #启动子进程pw,写入
pw.join() #等待pw结束
pr.start() #启动子进程pr,读出
pr.join()
3. 进程池中的通信 【只需要就上述的Queue()转换成Manager().Queue()】
from multiprocessing import Manager,Pool
import os
def reader(q):
print('reader启动{},父进程为{}'.format(os.getpid(),os.getppid()))
for i in range(q.qsize()):
print('reader从Queue获取到消息:{}'.format(q.get(True)))
def writer(q):
print('writer启动{},父进程为{}'.format(os.getpid(),os.getppid()))
for i in 'PSY':
q.put(i)
if __name__ == '__main__':
print('{} start'.format(os.getpid()))
q=Manager().Queue() #使用Mananger中的Queue来初始化
po=Pool()
po.apply(writer,(q,))
po.apply(reader,(q,))
po.close()
po.join()
print('{} End'.format(os.getpid()))
结果显示:
15636 start
writer启动13592,父进程为15636
reader启动16656,父进程为15636
reader从Queue获取到消息:P
reader从Queue获取到消息:S
reader从Queue获取到消息:Y
15636 End
4. 多进程拷贝文件 【多个文件的拷贝】
from multiprocessing import Pool,Manager
import os
def copyFileTask(name,oldFolderName,newFolderName,queue): #完成copy一个文件的功能
fr=open(oldFolderName+'/'+name,'r')
fw=open(newFolderName+'/'+name,'w')
content=fr.read()
fw.write(content)
fr.close()
fw.close()
queue.put(name)
def main():
#0、获取要copy的文件夹的名字
oldFolderName=input('请输入文件夹的名字:').strip()
#1、创建一个文件夹
newFolderName=oldFolderName+'-复件'
os.mkdir(newFolderName)
#2、获取old文件夹中的所有的文件名字
fileNames=os.listdir(oldFolderName) #fileNames是一个列表
allNum=len(fileNames)
#3、使用多进程的方法,copy原文件夹中的所有文件到新的文件夹中
pool=Pool(5)
queue=Manager().Queue()
for name in fileNames:
pool.apply_async(copyFileTask,args=(name,oldFolderName,newFolderName,queue))
num=0
while num<allNum:
queue.get() #这边的queue就是用来测试copy是否全复制完
num+=1
copyRate=num/allNum
print('copy的进度为{:.2f}%'.format(copyRate*100))
pool.close()
pool.join()
if __name__ == '__main__':
main()
二、多线程
-
多线程的模块 from threading import Thread
-
多线程实现的几种方式:
创建线程要执行的函数,把这个函数传递进Thread对象,让它来执行
继承Thread类,创建一个新的class,将要执行的代码写到run函数里面 -
多线程的例子
1. 加入互斥锁
from threading import Thread,Lock
import time
class mythread(Thread):
def __init__(self,threadname):
Thread.__init__(self,name=threadname)
def run(self):
global x
mutex.acquire() #上锁
for i in range(3):
x = x+1
time.sleep(1)
print(x) #运行程序,屏幕依次显示:3 6 9 12 15
mutex.release() #释放锁
if __name__ == '__main__':
mutex=Lock() #创建一把互斥锁,这个锁默认是没有上锁的
t1 = []
for i in range(5):
t = mythread(str(i))
t1.append(t)
x = 0
for i in t1:
i.start()
结果显示:
3
6
9
12
15
2. 不加入互斥锁
from threading import Thread,Lock
import time
class mythread(Thread):
def __init__(self,threadname):
Thread.__init__(self,name=threadname)
def run(self):
global x
#mutex.acquire()
for i in range(3):
x = x+1
time.sleep(1)
print(x) #运行程序,屏幕依次显示:3 6 9 12 15
#mutex.release() #释放锁
if __name__ == '__main__':
#mutex=Lock() #创建一把互斥锁,这个锁默认是没有上锁的
t1 = []
for i in range(5):
t = mythread(str(i))
t1.append(t)
x = 0
for i in t1:
i.start()
结果显示:
15
15
15
15
15
3. 在屏幕上连续打印10次ABC
屏幕连续打印10次ABC
from threading import Thread,Lock
import time,random
mutex=Lock()
def print1(lock):
lock.acquire()
print('A')
#time.sleep(random.randint(1,10))
lock.release()
def print2(lock):
lock.acquire()
print('B')
#time.sleep(random.randint(1,10))
lock.release()
def print3(lock):
lock.acquire()
print('C')
#time.sleep(random.randint(1,10))
lock.release()
for i in range(10):
t1=Thread(target=print1,args=(mutex,))
t1.start()
t1.join()
t2 = Thread(target=print2, args=(mutex,))
t2.start()
t2.join()
t3 = Thread(target=print3, args=(mutex,))
t3.start()
t3.join()
4. 死锁的产生
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread1(Thread):
def run(self):
if mutexA.acquire():
print(self.name+'---do1---up---')
time.sleep(1)
if mutexB.acquire():
print(self.name+'---do1---down---')
mutexB.release()
mutexA.release()
class MyThread2(Thread):
def run(self):
if mutexB.acquire():
print(self.name+'---do1---up---')
time.sleep(1)
if mutexA.acquire():
print(self.name+'---do1---down---')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
t1=MyThread1()
t2=MyThread2()
t1.start()
t2.start()
结果:【产生死锁】
Thread-1---do1---up---
Thread-2---do1---up---
注:若将例4中的if name == ‘main’:中的函数段修改为如下的代码,则不会产生死锁
if __name__ == '__main__':
t1=MyThread1()
t2=MyThread2()
t1.start()
t1.join()
t2.start()
t2.join()
结果:
Thread-1---do1---up---
Thread-1---do1---down---
Thread-2---do1---up---
Thread-2---do1---down---
5. 针对死锁的处理方法
···程序设计时自己避免
mutex=Lock()
mutex.acquire([blocking]) #锁定
mutex.release() #释放
其中acquire可以有一个blocking参数,如果设定blocking为True,则当前线程会阻塞,知道获取到这个锁为止,若没有指定,则默认为True
如果设定blocking为False,则当前线程不会堵塞
···自己设置超时时间,若是到达该时间,就释放该锁
6. 生产者消费者模型 【常用】 —>比如爬虫:爬取数据+处理数据—>使用队列进行缓冲
from queue import Queue #队列,先进先出 【用于解耦合】
from threading import Thread
import time
class Producer(Thread):
def run(self):
global queue
count=0
while True:
if queue.qsize()<1000:
for i in range(100): #只要当前队列中商品的数量小于1000,生产者开始生产产品(在主程序中定义了2个生产者)
count+=1
msg='生产产品{}'.format(count)
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(Thread):
def run(self):
global queue
while True:
if queue.qsize()>100:
for i in range(3): #只要当前队列中商品的数量大于100,消费者开始消费产品(在主程序中定义了5个消费者)
msg = self.name+'消费了'+queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue=Queue()
for i in range(50):
queue.put('初始化商品:{}'.format(i))
for i in range(2): #创建2个生产者
p=Producer()
p.start()
for i in range(5): #创建5个消费者
c=Consumer()
c.start()
二、异步
1、概念
同步调用:比如你喊你朋友吃饭,你朋友在忙,你就一直在那等,等你朋友忙完了,你们一起去吃饭。
异步调用:比如你喊你朋友吃饭,你朋友说知道了,待会忙完去找你,你就去做别的事了。
2、异步调用的小例子:
from multiprocessing import Pool
import os,time
def test():
print('--进程池中的进程--pid={}--ppid={}--'.format(os.getpid(),os.getppid()))
for i in range(3):
print('{}'.format(i))
time.sleep(1)
return 'hahaha'
def test2(args):
print('--callback func--pid={}--'.format(os.getpid()))
print('--callback func--args={}--'.format(args))
if __name__ == '__main__':
pool=Pool(3)
pool.apply_async(func=test,callback=test2) #callback就是回调
while True:
time.sleep(1)
print('----主进程--pid={}'.format(os.getpid()))
结果显示:
--进程池中的进程--pid=16412--ppid=5292--
0
----主进程--pid=5292
1
----主进程--pid=5292
2
----主进程--pid=5292
--callback func--pid=5292--
--callback func--args=hahaha--
----主进程--pid=5292
----主进程--pid=5292
----主进程--pid=5292
----主进程--pid=5292
......
......
......
以上是关于Python多进程,多线程和异步实例的主要内容,如果未能解决你的问题,请参考以下文章
python 多进程和多线程3 —— asyncio - 异步IO