python多进程

Posted Wualin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python多进程相关的知识,希望对你有一定的参考价值。

多进程

  • 进程:正在进行的过程或者说是一个任务,而负责执行任务则是cpu
  • 同一个程序执行两次是两次进程
  • 并发:
  • 并行:基于多核cpu

unix开子进程的拷贝一份父进程的数据

进行的三个状态:运行,阻塞,就绪

在python中如何开启子进程

  1. multiprocessing模块中的process类
# 方式1
from multiprocessing import Process
import time

def task(name):
    print(‘%s is running‘%name)
    time.sleep(5)
    print(‘%s is done‘%name)

if __name__ ==‘__main__‘:
    p = Process(target=task,args=(‘子进程1‘,))
    p.start() #仅仅只是给操作系统发送了一个信号
#方式二:自定义类继承自process
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self): #函数名必须是run
        print(‘%s is running‘%self.name)
        time.sleep(5)
        print(‘%s is done‘%self.name)

if __name__ == ‘__main__‘:
    p = MyProcess(‘子进程1‘)
    p.start()#本质就是在调用p.run
    print(‘进‘)

查看进程的pid与ppid(进程id)

  • 补充:
    • windows查看正在执行的进程:tasklist | findstr pycharm
    • Mac os查看正在执行的进程:ps aus|grep pycharm
from multiprocessing import Process
import time
import os


def task(name):
    print(‘%s is running,parent id is <%s>‘%(os.getpid(),os.getppid()))
    time.sleep(5)
    print(‘%s is done‘%os.getpid())

if __name__ ==‘__main__‘:
    p = Process(target=task,args=(‘子进程1‘,))
    p.start() #仅仅只是给操作系统发送了一个信号
    print(‘主‘,os.getpid())

process对象的其他属性或方法

  • join()等子进程结束完毕才会继续运行主进程,主进程结束后所有的僵尸进程结束
  • start()只是向操作系统发送信号,并不只是把进程立马开起来,如果连续有几个start有可能执行的先后顺序会错乱,如果start方法后面立马接一个join,多个子进程会变成串行
  • is_alive()查看子进程时候已经结束
  • terminate()杀死这个子进程
  • pid()查看进程id
  • name()查看这个对象的名字

守护进程

  • 当主进程执行结束子进程跟着结束:将daemon属性设置为true,守护进程不能有子进程。代码执行到程序最后一行代表程序运行结束

互斥锁

  • 把并发变成串行:使用multiprocessing模块下的Lock对象
#牺牲效率实现子进程串行
from multiprocessing import Process,Lock
import time

def tack(name,lock):
    lock.acquire()#加锁
    print(‘%s,1‘%name)
    time.sleep(1)
    print(‘%s,2‘%name)
    time.sleep(1)
    print(‘%s,3‘%name)
    lock.release()#释放锁
if __name__ == ‘__main__‘:
    lock = Lock()#实例化锁对象
    for i in range(3):
        p = Process(target=tack,args=(‘进程%s‘%i,lock))#把锁传到子进程中
        p.start()
  • 互斥锁与join的区别:join确实能实现代码串行,但join把整个代码变成串行,但互斥锁可以把部分代码变成串行

事件

  • 一个信号可以使所有的进程都进入阻塞状态,也可以控制所有的进程解除阻塞
  • 一个事件被创建之后,默认是阻塞状态
from multiprocessing import Event
e = Event()# 创建了一事件
print(e.is_set())# 查看一个事件的状态,默认是阻塞
e.set() # 将这个事件状态改为True
print(e.is_set())
e.wait()# 根据e.is_set()的值决定是否阻塞,如果是True就是不阻塞,如果是False就是阻塞状态
e.clear() # 将这个事件状态改为False
  • 事件模拟红绿灯:
import time
import random
from multiprocessing import Process,Event

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print(‘红灯亮了‘)
        else:
            e.set()
            print(‘绿灯亮了‘)
        time.sleep(5)
def cars(i,e):
    if not e.is_set():
        print(‘%s 在等待‘%i)
        e.wait()
    print(‘%s 通行了‘%i)


if __name__ == ‘__main__‘:
    e = Event()
    p1 = Process(target=light,args=(e,))
    p1.start()
    for i in range(20):
        p = Process(target=cars,args=(i,e))
        p.start()
        time.sleep(2)

队列

  • multiprocessing有提供基于ipc通信类(inter-process communication)
from multiprocessing import Queue

q = Queue(4)#指定队列大小,如果不指定大小则大小为无穷尽

q.put(‘hello‘)#插入数据到队列中
print(q.full())#判断队列中数据是否满了
q.get()#从队列读取并删除一个数据
print(q.empty())#判断队列中数据是否空了
  • 生产者消费这模型:
    • 生产者:生产数据任务
    • 消费者:消费数据任务
from multiprocessing import Process,Queue
import time
def producer(q):
    for i in range(3):
        res = ‘包子%s‘%i
        time.sleep(0.5)
        q.put(res)
        print(‘生产者生产了%s‘%res)

def consumer(q):
    while True:
        res = q.get()
        if res == None:break
        time.sleep(0.7)
        print(‘消费者吃了%s‘%res)


if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    p1.start()
    c1.start()
    p1.join()
    c1.join()
    q.put(None)
  • JoinableQueue:
    • 消费者:每次获取一个数据,处理一个数据,并且发送一个记号:标志一个数据被处理成功,计数器-1
    • 生产者:每一次生产一个数据,且每一次生产的数据放在队列中计数器+1,当生产者全部生产完毕之后发送一个join:已经停止生产数据且要等待之前生产的数据被消费完,当数据都被处理完时,join阻塞结束
from multiprocessing import Process,JoinableQueue
import time
def producer(q):
    for i in range(3):
        res = ‘包子%s‘%i
        time.sleep(0.5)
        q.put(res)
        print(‘生产者生产了%s‘%res)
    q.join() #阻塞,直到一个队列中的所有数据,全部被处理完毕

def consumer(q):
    while True:
        res = q.get()
        time.sleep(0.7)
        print(‘消费者吃了%s‘%res)
        q.task_done() # 处理完一个数据,队列中的计数器-1


if __name__ == ‘__main__‘:
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    p1.start()
    c1.daemon = True #守护进程,主进程中的代码执行完毕后,子进程自动结束
    c1.start()
    p1.join()

管道

  • 在进程之间传递信息(数据)
from multiprocessing import Pipe,Process

def func(conn1,conn2):
    conn1.close()
    while True:
        try:
            msg = conn2.recv()
            print(msg)
        except EOFError:
            print(‘子进程结束‘)
            conn2.close()
            break


if __name__ == ‘__main__‘:
    conn1,conn2 = Pipe()
    Process(target=func,args=(conn1,conn2)).start()
    conn2.close()
    for i in range(5):
        conn1.send(‘吃屎了你‘)
    conn1.close()

进程池

  • 创建进程池的过程
  1. 创建一个属于进程的池子
  2. 这个池子指定能存放多少个进程
  3. 开启进程
from multiprocessing import Pool,Process
import time
import os
def func(i):
    print(‘%s 进程正在运行‘%i,os.getpid())
    time.sleep(2)
    print(‘%s 进程运行结束‘%i,os.getpid())


if __name__ == ‘__main__‘:
    pool = Pool(3)
    for i in range(10):
        # pool.apply(func,args=(i,)) 同步调用
        pool.apply_async(func,args=(i,)) # 异步调用
    pool.close() # 结束进程池接收任务
    pool.join() # 感知进程池中的任务执行结束
    # apply_async必须是与close、join一起使用
  • 进程池的返回值
from multiprocessing import Pool

def func(i):

    return i*i

if __name__ == ‘__main__‘:
    p = Pool(5)
    res_lis = []
    for i in range(10):
        res = p.apply_async(func,args=(i,))
        res_lis.append(res) 

    for res in res_lis:
        print(res.get())# res.get 默认是阻塞的,因为需要接收进程处理的结果,之后接收到结果之后才可以继续执行,所以
                            # 所以将进程的返回结果放在一个列表中,然后循环这个列表,再执行get就不会阻塞了
    p.close()
    p.join()
  • 进程池的回调函数
from multiprocessing import Pool
import os
def func1(i):
    print(‘in func1‘,os.getpid())
    return i*i

def func2(ii):
    print(‘in func2‘,os.getpid())
    print(ii)

if __name__ == ‘__main__‘:
    ‘‘‘
    1. func1执行结果当作func2的参数,func1进程执行结束后返回结果,通过callback调用func2,将func1的执行返回结果当作参数传递给func2执行
    2. func1是一个新的进程
    3. func2是在主进程中执行的
    ‘‘‘
    p = Pool(5)
    p.apply_async(func1,args=(2,),callback=func2) 
    p.close()
    p.join()

以上是关于python多进程的主要内容,如果未能解决你的问题,请参考以下文章

[Python3] 043 多线程 简介

python中的多线程和多进程编程

代码片段:Shell脚本实现重复执行和多进程

线程学习知识点总结

什么是多线程,多进程?

多线程编程