Python多进程,多线程和异步实例

Posted 汤米先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多进程,多线程和异步实例相关的知识,希望对你有一定的参考价值。


前言

分享一些多进程,多线程和异步实例,顺便写个笔记记录一下

一、多进程

1. 进程间通信使用Queue队列

Queue的特点:先进先出,后进后出

Queue的使用:可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序

Queue.qsize() :返回当前队列包含的消息数据

Queue.empty() :如果队列为空,返回True,反之False

Queue.full() :如果队列满了,返回True,反之False

Queue.get([block[,timeout]]) :获取队列中的一条消息,然后将从队列中移除,block默认为True

Queue.put([item,[block[,timeout]]]) :将item消息写入队列中,block默认是True

from multiprocessing import Queue
q=Queue(2)
q.qsize()  #0
q.put('a')
q.put('b')
#q.put('c')  放不下了,因为队列中只能是2个  【阻塞】
q.get()  #a
q.get()  #b
#q.get()   取不了,因为队列中只能是2个   【阻塞】

2. 多进程中的通信【一个往Queue里写,一个从Queue里读】

from multiprocessing import Process,Queue
import os,time,random
def write(q): #写数据进程执行的代码
    for item in ['A','B','C']:
        print('Put {} to queue...'.format(item))
        q.put(item)
        time.sleep(random.randint(1,10))
def read(q):  #读数据进程执行的代码
    while True:
        if not q.empty():
            item=q.get(True)
            print('Get {} from queue'.format(item))
            time.sleep(random.randint(1,10))
        else:
            break
if __name__ == '__main__':
    q=Queue() #父进程创建Queue,并传给各个子进程
    pw=Process(target=write,args=(q,))
    pr=Process(target=read,args=(q,))
    pw.start() #启动子进程pw,写入
    pw.join()  #等待pw结束
    pr.start()  #启动子进程pr,读出
    pr.join()

3. 进程池中的通信 【只需要就上述的Queue()转换成Manager().Queue()】

from multiprocessing import Manager,Pool
import os
def reader(q):
    print('reader启动{},父进程为{}'.format(os.getpid(),os.getppid()))
    for i in range(q.qsize()):
        print('reader从Queue获取到消息:{}'.format(q.get(True)))

def writer(q):
    print('writer启动{},父进程为{}'.format(os.getpid(),os.getppid()))
    for i in 'PSY':
        q.put(i)
if __name__ == '__main__':
    print('{} start'.format(os.getpid()))
    q=Manager().Queue()  #使用Mananger中的Queue来初始化
    po=Pool()
    po.apply(writer,(q,))
    po.apply(reader,(q,))
    po.close()
    po.join()
    print('{} End'.format(os.getpid()))

结果显示:

15636 start
writer启动13592,父进程为15636
reader启动16656,父进程为15636
reader从Queue获取到消息:P
reader从Queue获取到消息:S
reader从Queue获取到消息:Y
15636 End

4. 多进程拷贝文件 【多个文件的拷贝】

from multiprocessing import Pool,Manager
import os

def copyFileTask(name,oldFolderName,newFolderName,queue):  #完成copy一个文件的功能
    fr=open(oldFolderName+'/'+name,'r')
    fw=open(newFolderName+'/'+name,'w')
    content=fr.read()
    fw.write(content)
    fr.close()
    fw.close()
    queue.put(name)

def main():
    #0、获取要copy的文件夹的名字
    oldFolderName=input('请输入文件夹的名字:').strip()


    #1、创建一个文件夹
    newFolderName=oldFolderName+'-复件'
    os.mkdir(newFolderName)

    #2、获取old文件夹中的所有的文件名字
    fileNames=os.listdir(oldFolderName)  #fileNames是一个列表
    allNum=len(fileNames)

    #3、使用多进程的方法,copy原文件夹中的所有文件到新的文件夹中
    pool=Pool(5)
    queue=Manager().Queue()
    for name in fileNames:
        pool.apply_async(copyFileTask,args=(name,oldFolderName,newFolderName,queue))

    num=0
    while num<allNum:
        queue.get()  #这边的queue就是用来测试copy是否全复制完
        num+=1
        copyRate=num/allNum
        print('copy的进度为{:.2f}%'.format(copyRate*100))

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

二、多线程

  1. 多线程的模块 from threading import Thread

  2. 多线程实现的几种方式:
    创建线程要执行的函数,把这个函数传递进Thread对象,让它来执行
    继承Thread类,创建一个新的class,将要执行的代码写到run函数里面

  3. 多线程的例子

1. 加入互斥锁

from threading import Thread,Lock
import time
class mythread(Thread):
    def __init__(self,threadname):
        Thread.__init__(self,name=threadname)
    def run(self):
        global x
        mutex.acquire()  #上锁
        for i in range(3):
            x = x+1
        time.sleep(1)
        print(x)  #运行程序,屏幕依次显示:3 6 9 12 15
        mutex.release()  #释放锁

if __name__ == '__main__':
    mutex=Lock()  #创建一把互斥锁,这个锁默认是没有上锁的
    t1 = []
    for i in range(5):
        t = mythread(str(i))
        t1.append(t)
    x = 0
    for i in t1:
        i.start()

结果显示:

3
6
9
12
15

2. 不加入互斥锁

from threading import Thread,Lock
import time
class mythread(Thread):
    def __init__(self,threadname):
        Thread.__init__(self,name=threadname)
    def run(self):
        global x
        #mutex.acquire()
        for i in range(3):
            x = x+1
        time.sleep(1)
        print(x)  #运行程序,屏幕依次显示:3 6 9 12 15
        #mutex.release()  #释放锁

if __name__ == '__main__':
    #mutex=Lock()  #创建一把互斥锁,这个锁默认是没有上锁的
    t1 = []
    for i in range(5):
        t = mythread(str(i))
        t1.append(t)
    x = 0
    for i in t1:
        i.start()

结果显示:

15
15
15
15
15

3. 在屏幕上连续打印10次ABC

屏幕连续打印10次ABC

from threading import Thread,Lock
import time,random
mutex=Lock()
def print1(lock):
    lock.acquire()
    print('A')
    #time.sleep(random.randint(1,10))
    lock.release()
def print2(lock):
    lock.acquire()
    print('B')
    #time.sleep(random.randint(1,10))
    lock.release()
def print3(lock):
    lock.acquire()
    print('C')
    #time.sleep(random.randint(1,10))
    lock.release()
for i in range(10):
    t1=Thread(target=print1,args=(mutex,))
    t1.start()
    t1.join()
    t2 = Thread(target=print2, args=(mutex,))
    t2.start()
    t2.join()
    t3 = Thread(target=print3, args=(mutex,))
    t3.start()
    t3.join()

4. 死锁的产生

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread1(Thread):
    def run(self):
        if mutexA.acquire():
            print(self.name+'---do1---up---')
            time.sleep(1)
            if mutexB.acquire():
                print(self.name+'---do1---down---')
                mutexB.release()
            mutexA.release()
class MyThread2(Thread):
    def run(self):
        if mutexB.acquire():
            print(self.name+'---do1---up---')
            time.sleep(1)
            if mutexA.acquire():
                print(self.name+'---do1---down---')
                mutexA.release()
            mutexB.release()
if __name__ == '__main__':
    t1=MyThread1()
    t2=MyThread2()
    t1.start()
    t2.start()

结果:【产生死锁】

Thread-1---do1---up---
Thread-2---do1---up---

注:若将例4中的if name == ‘main’:中的函数段修改为如下的代码,则不会产生死锁

if __name__ == '__main__':
    t1=MyThread1()
    t2=MyThread2()
    t1.start()
    t1.join()
    t2.start()
    t2.join()

结果:

Thread-1---do1---up---
Thread-1---do1---down---
Thread-2---do1---up---
Thread-2---do1---down---

5. 针对死锁的处理方法

···程序设计时自己避免
mutex=Lock()
mutex.acquire([blocking])  #锁定
mutex.release()  #释放
其中acquire可以有一个blocking参数,如果设定blocking为True,则当前线程会阻塞,知道获取到这个锁为止,若没有指定,则默认为True
                                 如果设定blocking为False,则当前线程不会堵塞
···自己设置超时时间,若是到达该时间,就释放该锁

6. 生产者消费者模型 【常用】 —>比如爬虫:爬取数据+处理数据—>使用队列进行缓冲

from queue import Queue  #队列,先进先出  【用于解耦合】
from threading import Thread
import time
class Producer(Thread):
    def run(self):
        global queue
        count=0
        while True:
            if queue.qsize()<1000:
                for i in range(100):  #只要当前队列中商品的数量小于1000,生产者开始生产产品(在主程序中定义了2个生产者)
                    count+=1
                    msg='生产产品{}'.format(count)
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)

class Consumer(Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize()>100:
                for i in range(3):  #只要当前队列中商品的数量大于100,消费者开始消费产品(在主程序中定义了5个消费者)
                    msg = self.name+'消费了'+queue.get()
                    print(msg)
            time.sleep(1)
if __name__ == '__main__':
    queue=Queue()
    for i in range(50):
        queue.put('初始化商品:{}'.format(i))
    for i in range(2):  #创建2个生产者
        p=Producer()
        p.start()
    for i in range(5):  #创建5个消费者
        c=Consumer()
        c.start()

二、异步

1、概念

同步调用:比如你喊你朋友吃饭,你朋友在忙,你就一直在那等,等你朋友忙完了,你们一起去吃饭。

异步调用:比如你喊你朋友吃饭,你朋友说知道了,待会忙完去找你,你就去做别的事了。

2、异步调用的小例子:

from multiprocessing import Pool
import os,time

def test():
    print('--进程池中的进程--pid={}--ppid={}--'.format(os.getpid(),os.getppid()))
    for i in range(3):
        print('{}'.format(i))
        time.sleep(1)
    return 'hahaha'

def test2(args):
    print('--callback func--pid={}--'.format(os.getpid()))
    print('--callback func--args={}--'.format(args))

if __name__ == '__main__':
    pool=Pool(3)
    pool.apply_async(func=test,callback=test2)  #callback就是回调
    while True:
        time.sleep(1)
        print('----主进程--pid={}'.format(os.getpid()))

结果显示:

--进程池中的进程--pid=16412--ppid=5292--
0
----主进程--pid=5292
1
----主进程--pid=5292
2
----主进程--pid=5292
--callback func--pid=5292--
--callback func--args=hahaha--
----主进程--pid=5292
----主进程--pid=5292
----主进程--pid=5292
----主进程--pid=5292
......
......
......

以上是关于Python多进程,多线程和异步实例的主要内容,如果未能解决你的问题,请参考以下文章

122 Python程序中的多进程和多线程

python多进程和多线程的区别

python 多进程和多线程3 —— asyncio - 异步IO

python 多进程和多线程3 —— asyncio - 异步IO

什么是多线程,多进程?

python 多进程/多线程/协程 同步异步