Python之进程 - multiprocessing模块

Posted 小牧牧

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python之进程 - multiprocessing模块相关的知识,希望对你有一定的参考价值。

? 我们已经了解了,运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块。

? 仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分进程同步部分进程池部分进程之间数据共享。重点强调:进程没有任何共享状态,进程修改的数据,改动仅限于该进程内,但是通过一些特殊的方法,可以实现进程之间数据的共享。

一、process模块介绍

process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建

使用:

Process([group [, target [, name [, args [, kwargs]]]]]),
由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

1.1 进程创建的第一种方法

程序示例:

from multiprocessing import Process


def func():
    print(12345)


if __name__ == '__main__':
    p = Process(target=func,)
    p.start()
    print('*'*10)

功能讲解

if __name__ == '__main__': 
# windows 下才需要写这个,这和系统创建进程的机制有关系,不用深究,记着windows下要写就好啦
     #首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程

    p = Process(target=func,) #将函数注册到一个进程中,p是一个进程对象,此时还没有启动进程,只是创建了一个进程对象。并且func是不加括号的,因为加上括号这个函数就直接运行了对吧。
  
    p.start() #告诉操作系统,给我开启一个进程,func这个函数就被我们新开的这个进程执行了,而这个进程是我主进程运行过程中创建出来的,所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。
          #而这个子进程中执行的程序,相当于将现在这个test.py文件中的程序copy到一个你看不到的python文件中去执行了,就相当于当前这个文件,被另外一个py文件import过去并执行了。
          #start并不是直接就去执行了,我们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,并且在这个三个状态之间不断的转换,等待cpu执行时间片到了。
        
    print('*' * 10) #这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为异步

上面我们说过,我们通过主进程创建的子进程是异步执行的,那么我们就验证一下,并且看一下子进程和主进程(也就是父进程)的ID号(讲一下pidppid,使用pycharm举例),来看看是否是父子关系。

from multiprocessing import Process
import time
import os


def func():
    print('aaaaa')
    time.sleep(1)
    print('该子进程ID:', os.getpid())  #获取自己的进程ID号
    print('该子进程的父进程ID:', os.getppid())  #获取自己进程的父进程ID
    print(12345)


if __name__ == '__main__':
    p = Process(target=func,)
    p.start()
    print('*'*10)
    print('父进程ID>>>', os.getpid())
    print('父进程的父进程ID>>>', os.getpid())

打印结果:

**********
父进程ID>>> 57235
父进程的父进程ID>>> 57235
aaaaa
该子进程ID: 57236
该子进程的父进程ID: 57235
12345

# 首先打印出来了主进程的程序,然后打印的是子进程的,也就是子进程是异步执行的,相当于主进程和子进程同时运行着,如果是同步的话,我们先执行的是func(),然后再打印主进程最后的10个*号。

一个进程的生命周期:

如果子进程的运行时间长,那么等到子进程执行结束程序才结束,如果主进程的执行时间长,那么主进程执行结束程序才结束,实际上我们在子进程中打印的内容是在主进程的执行结果中看不出来的,但是pycharm帮我们做了优化,因为它会识别到你这是开的子进程,帮你把子进程中打印的内容打印到了显示台上。

? 如果说一个主进程运行完了之后,我们把pycharm关了,但是子进程还没有执行结束,那么子进程还存在吗?这要看你的进程是如何配置的,如果说我们没有配置说我主进程结束,子进程要跟着结束,那么主进程结束的时候,子进程是不会跟着结束的,他会自己执行完,如果我设定的是主进程结束,子进程必须跟着结束,那么就不会出现单独的子进程(孤儿进程)了,具体如何设置,看下面的守护进程的讲解。比如说,我们将来启动项目的时候,可能通过cmd来启动,那么我cmd关闭了你的项目就会关闭吗,不会的,因为你的项目不能停止对外的服务,对吧。

1.2 Process类中参数的介绍:

参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称

给要执行的函数传参数:

def func(x, y):
    print(x)
    time.sleep(1)
    print(y)


if __name__ == '__main__':
    p = Process(target=func, args=('hello', 'world'))  # 这是func需要接收的参数的传送方式。
    p.start()
    print('父进程执行结束!')

# 执行结果:
父进程执行结束!
hello
world

1.3 Process类中各方法的介绍

  1. p.start():启动进程,并调用该子进程中的p.run()

  2. p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法

  3. p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

  4. p.is_alive():如果p仍然运行,返回True

  5. p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能joinstart开启的进程,而不能join住run开启的进程

join方法的例子

让主进程加上join的地方等待(也就是阻塞住),等待子进程执行完之后,再继续往下执行我的主进程,好多时候,我们主进程需要子进程的执行结果,所以必须要等待。join感觉就像是将子进程和主进程拼接起来一样,将异步改为同步执行。

from multiprocessing import Process
import time

def func(x, y):
    print(x)
    time.sleep(1)
    print(y)

if __name__ == '__main__':
    p = Process(target=func, args=('hello', 'world'))  # 这是func需要接收的参数的传送方式。
    p.start()
    print("我这里是异步的!")
    p.join()  # 只有在join的地方才会阻塞住,将子进程和主进程之间的异步改为同步
    print('父进程执行结束!')

# 执行结果
我这里是异步的!
hello
world
父进程执行结束!

开启多个进程

for循环。并且我有个需求就是说,所有的子进程异步执行,然后所有的子进程全部执行完之后,我再执行主进程,怎么搞?看代码


1.4 Process类中自带封装的各属性的介绍

  1. p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

  2. p.name:进程的名称

  3. p.pid:进程的pid

  4. p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

  5. p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

二、process类的使用

注意:

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside
if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.

由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

在windows中Process()必须放到 if __name__ == '__main__':

2.1 进程的创建第二种方法(继承)

from multiprocessing import Process
import os

class MyProcess(Process):  # 自己写一个类,继承Process类
    # 我们通过init方法可以传参数,如果只写一个run方法,那么没法传参数,因为创建对象的是传参就是在init方法里面
    def __init__(self, person):
        super().__init__()
        self.person = person

    def run(self):
        print(os.getpid())
        print(self.pid)
        print(self.pid)
        print('%s 正在和女主播聊天' % self.person)
    # def start(self):
    #     #如果你非要写一个start方法,可以这样写,并且在run方法前后,可以写一些其他的逻辑
    #     self.run()

if __name__ == '__main__':
    p1 = MyProcess('Jedan')
    p2 = MyProcess('太白')
    p3 = MyProcess('alexDSB')
    
    p1.start()  # start内部会自动调用run方法
    p2.start()
    # p2.run()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

运行结果:

57683
57683
57683
Jedan 正在和女主播聊天
57684
57684
57684
太白 正在和女主播聊天
57685
57685
57685
alexDSB 正在和女主播聊天

2.2 进程之间的数据隔离

进程之间的数据是隔离的,也就是数据不共享,下面的是验证:

from multiprocessing import Process

n = 100
# 首先定义一个全局变量,在windows系统中应该把全局变量定义在if __name__ == '__main__'之上

def work():
    global n
    n = 0
    print('子进程内: ', n)


if __name__ == '__main__':
    p = Process(target=work,)
    p.start()
    p.join()  # 等待子进程执行完毕,如果数据共享的话,我子进程是不是通过global将n改为0了,但是你看打印结果,主进程在子进程执行结束之后,仍然是n=100,子进程n=0,说明子进程对n的修改没有在主进程中生效,说明什么?说明他们之间的数据是隔离的,互相不影响的
    print('主进程内: ', n)

# 打印结果
子进程内:  0
主进程内:  100

2.3 练习:多进程实现socket多客户端通讯

我们之前学socket的时候,知道tcp协议的socket是不能同时和多个客户端进行连接的,(这里先不考虑socketserver那个模块),那我们自己通过多进程来实现一下同时和多个客户端进行连接通信:

服务端代码示例:

from socket import *
from multiprocessing import Process


def talk(conn, client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            print('客户端消息>>', msg)
            if not msg: break
            conn.send(msg.upper())
            # 在这里有同学可能会想,我能不能在这里写input来自己输入内容和客户端进行对话?朋友,是这样的,按说是可以的,但是需要什么呢?需要你像我们用pycharm的是一样下面有一个输入内容的控制台,当我们的子进程去执行的时候,我们是没有地方可以显示能够让你输入内容的控制台的,所以你没办法输入,就会给你报错。
        except Exception:
            break


if __name__ == '__main__':  # windows下start进程一定要写到这下面
    server = socket(AF_INET, SOCK_STREAM)
    # server.setsockopt(SOL_SOCKET, SO_REUSEADDR,1)  # 如果你将如果你将bind这些代码写到if __name__ == '__main__'这行代码的上面,那么地址重用必须要有,因为我们知道windows创建的子进程是对整个当前文件的内容进行的copy,前面说了就像import,如果你开启了子进程,那么子进程是会执行bind的,那么你的主进程bind了这个ip和端口,子进程在进行bind的时候就会报错。
    server.bind(('127.0.0.1', 8080))
    # 有同学可能还会想,我为什么多个进程就可以连接一个server段的一个ip和端口了呢,我记得当时说tcp的socket的时候,我是不能在你这个ip和端口被连接的情况下再连接你的啊,这里是因为当时我们就是一个进程,一个进程里面是只能一个连接的,多进程是可以多连接的,这和进程之间是单独的内存空间有关系,先这样记住他,好吗?
    server.listen(5)
    while True:
        conn, client_addr = server.accept()
        p = Process(target=talk, args=(conn, client_addr))
        p.start()

(注意:通过这个是不能做qq聊天的,因为qq聊天是qq的客户端把信息发给另外一个qq的客户端,中间有一个服务端帮你转发消息,而不是我们这样的单纯的客户端和服务端对话,并且子进程开启之后咱们是没法操作的,并且没有为子进程input输入提供控制台,所有你再在子进程中写上了input会报错,EOFError错误,这个错误的意思就是你的input需要输入,但是你输入不了,就会报这个错误。而子进程的输出打印之类的,是pycharm做了优化,将所有子进程中的输出结果帮你打印出来了,但实质还是不同进程的。)

客户端代码示例:

from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>: ').strip()
    if not msg: continue

    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print(msg.decode('utf-8'))

2.4 Process对象的其他方法或属性

进程对象的其他方法一: terminateis_alive

from multiprocessing import Process
import time
import random

class Student(Process):
    def __init__(self, name):
        self.name = name
        super().__init__()

    def run(self):
        print('%s is 做作业' % self.name)
        # s = input('???') #别忘了再pycharm下子进程中不能input输入,会报错EOFError: EOF when reading a line,因为子进程中没有像我们主进程这样的在pycharm下的控制台可以输入东西的地方
        time.sleep(2)
        print('%s is 做作业结束' % self.name)

if __name__ == '__main__':
    p1 = Student('太白')
    p1.start()
    time.sleep(2)
    p1.terminate()  # 关闭进程,不会立即关闭,有个等着操作系统去关闭这个进程的时间,所以is_alive立刻查看的结果可能还是存活,但是稍微等一会,就被关掉了
    print(p1.is_alive())  # 结果为True
    print('等会。。。。')
    time.sleep(1)
    print(p1.is_alive())  # 结果为False

# 打印结果:
Student-1 is 做作业
Student-1 is 做作业结束
True
等会。。。。
False

进程对象的其他方法二:namepid

from multiprocessing import Process
import time
import random

class Study(Process):
    def __init__(self, name):
        # self.name=name
        # super().__init__() #Process的__init__方法会执行self.name=Study-1,所以加到这里,会覆盖我们的self.name=name

        # 为我们开启的进程设置名字的做法
        super().__init__()
        self.name = name

    def run(self):
        print('%s is studying' % self.name)
        time.sleep(random.randrange(1, 3))
        print('%s is study end' % self.name)

p = Study('young')
p.start()
print('开始')
print(p.pid)  # 查看pid

# 打印结果
开始
58745
young is studying
young is study end

注意:Process__init__方法会执行self.name=Study-1,所以我们的self.name=name应该加在super().__init__()之后,这样进程的名字就是我们设置的了

2.5 僵尸进程和孤儿进程

详见:

三、守护进程

? 之前我们讲的子进程是不会随着主进程的结束而结束,子进程全部执行完之后,程序才结束,那么如果有一天我们的需求是我的主进程结束了,由我主进程创建的那些子进程必须跟着结束,怎么办?守护进程就来了!

主进程创建守护进程

  • 其一:守护进程会在主进程代码执行结束后就终止

  • 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

import os
import time
from multiprocessing import Process


class Myprocess(Process):
    def __init__(self, person):
        super().__init__()
        self.person = person

    def run(self):
        print(os.getpid(), self.person)
        print('%s正在和女主播聊天' % self.person)
        time.sleep(3)


if __name__ == '__main__':
    p = Myprocess('太白')
    p.daemon = True  # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    # time.sleep(1)  # 在sleep时linux下查看进程id对应的进程ps -ef|grep id
    print('主')

四、进程同步(锁)

? 通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

多进程抢占输出资源,导致打印混乱的示例:

import os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' % (n, os.getpid()))

if __name__ == '__main__':
    for i in range(5):
        p = Process(target=work, args=(i,))
        p.start()

# 打印结果:
0: 59118 is running
1: 59119 is running
2: 59120 is running
3: 59121 is running
4: 59122 is running
0:59118 is done
2:59120 is done
1:59119 is done
3:59121 is done
4:59122 is done

看结果,可以看出两个问题:

问题一:每个进程中work函数的第一个打印就不是按照我们for循环的0-4的顺序来打印的

问题二:我们发现,每个work进程中有两个打印,但是我们看到所有进程中第一个打印的顺序为0-1-2-3-4,但是第二个打印没有按照这个顺序,变成了0-2-1-3-4,说明我们一个进程中的程序的执行顺序都混乱了。

问题的解决方法: 第二个问题可以加锁来解决,第一个问题是没有办法解决的,因为进程开到了内核,有操作系统来决定进程的调度,我们自己控制不了

加锁:由并发改成了串行,牺牲了运行效率,但避免了竞争

from multiprocessing import Process, Lock
import os, time

def work(n, lock):
    # 加锁,保证每次只有一个进程在执行锁里面的程序,这一段程序对于所有写上这个锁的进程,大家都变成了串行
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(1)
    print('%s:%s is done' % (n, os.getpid()))
    # 解锁,解锁之后其他进程才能去执行自己的程序
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(5):
        p = Process(target=work, args=(i, lock))
        p.start()
#打印结果:
0: 59138 is running
0:59138 is done
1: 59139 is running
1:59139 is done
2: 59140 is running
2:59140 is done
3: 59141 is running
3:59141 is done
4: 59142 is running
4:59142 is done

结果分析:通过结果我们可以看出,多进程刚开始去执行的时候,每次运行,首先打印出来哪个进程的程序是不固定的,但是我们解决了上面打印混乱示例代码的第二个问题,那就是同一个进程中的两次打印都是先完成的,然后才切换到下一个进程去,打印下一个进程中的两个打印结果,说明我们控制住了同一进程中的代码执行顺序,如果涉及到多个进程去操作同一个数据或者文件的时候,就不担心数据算错或者文件中的内容写入混乱了。

4.1 模拟抢票

? 上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。接下来,我们以模拟抢票为例,来看看数据安全的重要性:

  • 并发运行,效率高,但是竞争同一个文件,导致数据混乱
from multiprocessing import Process, Lock
import json, random, time


# 查看剩余票数
def search():
    dic = json.load(open('db'))  # 打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
    print('剩余票数%s' % dic['count'])

# 抢票
def get():
    dic = json.load(open('db'))
    time.sleep(0.1)  # 模拟读数据的网络延迟,那么进程之间的切换,导致所有人拿到的字典都是{"count": 1},也就是每个人都拿到了这一票。
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模拟写数据的网络延迟
        json.dump(dic, open('db', 'w'))
        # 最终结果导致,每个人显示都抢到了票,这就出现了问题~
        print('购票成功')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(3):  # 模拟并发100个客户端抢票
        p = Process(target=task)
        p.start()

注意:首先在当前文件目录下创建一个名为db的文件,文件db的内容为:{"count":1},只有这一行数据,并且注意,每次运行完了之后,文件中的1变成了0,你需要手动将0改为1,然后在去运行代码。注意一定要用双引号,不然json无法识别

运行结果:

剩余票数1
剩余票数1
剩余票数1
购票成功
购票成功
购票成功

结果分析:由于网络延迟等原因使得进程切换,导致每个人都抢到了这最后一张票

  • 加锁:购票行为由并发变成了串行,牺牲了效率,但是保证了数据安全
from multiprocessing import Process, Lock
import time, json

# 查看剩余票数
def search():
    dic = json.load(open('db'))  # 打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
    print('剩余票数%s' % dic['count'])

# 抢票
def get():
    dic = json.load(open('db'))
    time.sleep(0.1)  # 模拟读数据的网络延迟,那么进程之间的切换,导致所有人拿到的字典都是{"count": 1},也就是每个人都拿到了这一票。
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模拟写数据的网络延迟
        json.dump(dic, open('db', 'w'))
        # 最终结果导致,每个人显示都抢到了票,这就出现了问题~
        print('购票成功')
    else:
        print('sorry,没票了亲!')

def task(lock):
    search()
    # 因为抢票的时候是发生数据变化的时候,所有我们将锁加加到这里
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()  # 创建一个锁
    for i in range(3):  # 模拟并发100个客户端抢票
        p = Process(target=task, args=(lock,))  # 将锁作为参数传给task函数
        p.start()
        
# 打印结果:
剩余票数1
剩余票数1
剩余票数1
购票成功
sorry,没票了亲!
sorry,没票了亲!

4.2 进程锁总结

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,牺牲了速度却保证了数据安全。进程锁的问题:

  1. 效率低(共享数据基于文件,而文件是硬盘上的数据)
  2. 需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

1、效率高(多个进程共享一块内存的数据)

2、帮我们处理好锁问题。

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道:

队列和管道都是将数据存放于内存中

队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

IPC通信机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操作系统都需要有相应的IPC机制,
比如Windows上可以通过剪贴板、管道和邮槽等来进行进程间通信,而Linux上可以通过命名共享内容、信号量等来进行进程间通信。Android它也有自己的进程间通信方式,Android建构在Linux基础上,继承了一部分Linux的通信方式。

五、队列

? 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列管道,这两种方式都是使用消息传递的。

? 进程间通信(IPC)方式一:队列。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。

Queue([maxsize]) - 创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
注意:底层队列使用管道和锁实现。

5.1 Queue的方法介绍

q = Queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。


q.empty() 
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() 
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)

q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

5.2 队列的简单用法

from multiprocessing import Queue

q = Queue(3)  # 创建一个队列对象,队列长度为3

# put ,get ,put_nowait,get_nowait,full,empty
q.put(3)  # 往队列中添加数据
q.put(2)
q.put(1)
# q.put(4)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
# 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
    q.put_nowait(4)  # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except:  # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    print('队列已经满了')

# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full())  # 查看是否满了,满了返回True,不满返回False

print(q.get())  # 取出数据
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
    q.get_nowait(3)  # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except:  # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
    print('队列已经空了')

print(q.empty())  # 空了

# 打印结果:
队列已经满了
True
3
2
1
队列已经空了
True

子进程与父进程通过队列进行通信

from multiprocessing import Process, Queue
import time

# 8. q = Queue(2) #创建一个Queue对象,如果写在这里,那么在windows还子进程去执行的时候,我们知道子进程中还会执行这个代码,但是子进程中不能够再次创建了,也就是这个q就是你主进程中创建的那个q,通过我们下面在主进程中先添加了一个字符串之后,在去开启子进程,你会发现,小鬼这个字符串还在队列中,也就是说,我们使用的还是主进程中创建的这个队列。
def f(q):
    # q = Queue() #9. 我们在主进程中开启了一个q,如果我们在子进程中的函数里面再开一个q,那么你下面q.put('姑娘,多少钱~')添加到了新创建的这q里里面了
    q.put('姑娘,多少钱~')  # 4.调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
    # print(q.qsize())  #6.查看队列中有多少条数据了

def f2(q):
    print('>>>>>: ')
    print(q.get())  # 5.取数据

if __name__ == '__main__':
    q = Queue()  # 1.创建一个Queue对象
    q.put('小鬼')

    p = Process(target=f, args=(q,))  # 2.创建一个进程
    p2 = Process(target=f2, args=(q,))  # 3.创建一个进程
    p.start()
    p2.start()
    time.sleep(1)  # 7.如果阻塞一点时间,就会出现主进程运行太快,导致我们在子进程中查看qsize为1个。
    print(q.get())  #结果:小鬼
    # print(q.get())  # 结果:姑娘,多少钱~
    p.join()

批量的生产输入放入队列,再批量的获取结果

接下来看一个稍微复杂的例子:

import os
import time
import multiprocessing

# 向queue中输入数据的函数
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中输出数据的函数
def outputQ(queue):
    info = queue.get()
    print('%s%s33[32m%s33[0m' % (str(os.getpid()), '(get):', info))

# Main
if __name__ == '__main__':
    # windows下,如果开启的进程比较多的话,程序会崩溃,为了防止这个问题,使用freeze_support()方法来解决。知道就行啦
    multiprocessing.freeze_support()
    record1 = []  # store input processes
    record2 = []  # store output processes
    queue = multiprocessing.Queue(3)

    # 输入进程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ, args=(queue,))
        process.start()
        record1.append(process)

    # 输出进程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()

输出结果:

63226(get):63216(put):Wed Apr 10 11:08:57 2019
63227(get):63217(put):Wed Apr 10 11:08:57 2019
63228(get):63218(put):Wed Apr 10 11:08:57 2019
63229(get):63219(put):Wed Apr 10 11:08:57 2019
63230(get):63220(put):Wed Apr 10 11:08:57 2019
63231(get):63221(put):Wed Apr 10 11:08:57 2019
63232(get):63222(put):Wed Apr 10 11:08:57 2019
63233(get):63223(put):Wed Apr 10 11:08:57 2019
63234(get):63224(put):Wed Apr 10 11:08:57 2019
63235(get):63225(put):Wed Apr 10 11:08:57 2019

队列是进程安全的:同一时间只能一个进程拿到队列中的一个数据,你拿到了一个数据,这个数据别人就拿不到了。

5.3 生产者消费者模型

? 在并发编程中使用生产者和消费者模式能够解决绝大数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式:

? 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题引入了生产者和消费者模式。

什么是生产者和消费者模式?

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,就可以开多进程等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。

基于队列实现一个生产者消费者模型

from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('33[45m%s 吃 %s33[0m' % (os.getpid(), res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q,))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))

    # 开始
    p1.start()
    c1.start()
    print('主')

通过上面基于队列的生产者消费者代码示例,我们发现一个问题:主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在了q.get()这一步。解决方式则是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

1.子进程生产者在生产完毕后发送结束信号None:

# 子进程生产者在生产完毕后发送结束信号
from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        if res is None:  #收到结束信号则结束
            break
        time.sleep(random.randint(1, 3))
        print('33[45m%s 吃 %s33[0m' % (os.getpid(), res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' % (os.getpid(), res))
    q.put(None)  # 在自己的子进程的最后加入一个结束信号

if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q,))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))

    # 开始
    p1.start()
    c1.start()
    print('主')

? 注意:结束信号None,不一定由生产者发,主进程同样可以发,但主进程需要等生产者结束后才应该发送该信号

2.主进程在生产者完毕后发送结束信号None

# 主进程在生产者完毕后发送结束信号None
from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        if res is None:
            break
        time.sleep(random.randint(1, 3))
        print('33[45m%s 吃 %s33[0m' % (os.getpid(), res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1, 3))
        res = '包子%s' % i
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q,))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))

    # 开始
    p1.start()
    c1.start()

    p1.join()  #等待生产者进程结束
    q.put(None)  #发送结束信号
    print('主')

但上述解决方式,在有多个生产者和消费者时,由于队列我们说了是进程安全的,我一个进程拿走了结束信号,另一个进程就拿不到了,还需要多发送一个结束信号,有几个取数据 的进程就要发送几个结束信号,我们则需要用一个比较low的方式去解决:

3.有多个消费者和生产者的时候需要发送多次结束信号

# 有多个消费者和生产者的时候需要发送多次结束信号
from multiprocessing import Process, Queue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        if res is None:
            break  # 收到结束信号则结束
        time.sleep(random.randint(1, 3))
        print('33[45m%s 吃 %s33[0m' % (os.getpid(), res))

def producer(name, q):
    for i in range(2):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (name, i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨头', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    # 开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()  # 必须保证生产者全部生产完毕,才应该发送结束信号
    p2.join()
    p3.join()
    q.put(None)  # 有几个消费者就应该发送几次结束信号None
    q.put(None)  # 发送结束信号
    print('主')

其实我们的思路无非就是发送结束信号而已,有另外一种队列提供了这种机制。

5.4 JoinableQueue

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制

方法介绍:
JoinableQueue实例p除了与Queue对象相同的方法之外还具有:

  1. q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  2. q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。

JoinableQueue队列实现生产者消费者模型

# JoinableQueue队列实现生产者消费者模型
from multiprocessing import Process, JoinableQueue
import time, random, os

def consumer(q):
    while True:
        res = q.get()
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        print('33[45m%s 吃 %s33[0m' % (os.getpid(), res))

        q.task_done()  # 向q.join()发送一次信号,证明一个数据已经被取走并执行完了


def producer(name, q):
    for i in range(10):
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
        res = '%s%s' % (name, i)
        q.put(res)
        print('33[44m%s 生产了 %s33[0m' % (os.getpid(), res))
    print('%s生产结束' % name)

    q.join()  # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。

    print('%s生产结束~~~~~~' % name)


if __name__ == '__main__':
    q = JoinableQueue()
    # 生产者们:即厨师们
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('骨头', q))
    p3 = Process(target=producer, args=('泔水', q))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 如果不加守护,那么主进程结束不了,但是加了守护之后,必须确保生产者的内容生产完并且被处理完了,所有必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,并且能够确保守护进程在所有任务执行完成之后才随着主进程的结束而结束。
    c2.daemon = True

    # 开始
    p_l = [p1, p2, p3, c1, c2]
    for p in p_l:
        p.start()

    p1.join()  # 我要确保你的生产者进程结束了,生产者进程的结束标志着你生产的所有的人任务都已经被处理完了
    p2.join()
    p3.join()
    print('主')

    # 主进程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

5.5 生产者消费者模型总结

#程序中有两类角色
    一类负责生产数据(生产者)
    一类负责处理数据(消费者)
        
#引入生产者消费者模型为了解决的问题是:
    平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
        
#如何实现:
    生产者<-->队列<——>消费者
    
    生产者消费者模型实现 类程序的解耦和

6.管道(了解)

进程间通信(IPC)方式二:管道(不推荐使用,了解即可),会导致数据不安全的情况出现,后面我们会说到为什么会带来数据 不安全的问题。

6.1 管道介绍

创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

参数介绍:
dumplex:默认管道是全双工的,如果将duplex设成Falseconn1只能用于接收,conn2只能用于发送。

主要方法:

conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。

conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

其他方法:

conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法

conn1.fileno():返回连接使用的整数文件描述符

conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。

conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

6.2 管道初使用

from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello World")  # 子进程发送了消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 建立管道,拿到管道的两端,双工通信方式,两端都可以收发消息
    p = Process(target=f, args=(child_conn,))  # 将管道的一段给子进程
    p.start()  # 开启子进程
    print(parent_conn.recv())  # 主进程接受了消息
    p.join()

技术图片

应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起(就是阻塞)。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道的相同一端就会能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

引发EOFError

from multiprocessing import Process, Pipe

def f(parent_conn, child_conn):
    # parent_conn.close() #不写close将不会引发EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn, child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()

主进程将管道的两端都传送给子进程,子进程和主进程共用管道的两种报错情况,都是在recv接收的时候报错的:
1.主进程和子进程中的管道的相同一端都关闭了,出现EOFError
2.如果你管道的一端在主进程和子进程中都关闭了,但是你还用这个关闭的一端去接收消息,那么就会出现OSError

所以你关闭管道的时候,就容易出现问题,需要将所有只用这个管道的进程中的两端全部关闭才行。当然也可以通过异常捕获(try:except EOFerror)来处理。

虽然我们在主进程和子进程中都打印了一下conn1一端的对象,发现两个不再同一个地址,但是子进程中的管道和主进程中的管道还是可以通信的,因为管道是同一套,系统能够记录。    

我们的目的就是关闭所有的管道,那么主进程和子进程进行通信的时候,可以给子进程传管道的一端就够了,并且用我们之前学到的,信息发送完之后,再发送一个结束信号None,那么你收到的消息为None的时候直接结束接收或者说结束循环,就不用每次都关闭各个进程中的管道了。

通过结束信号None来结束程序:

from multiprocessing import Pipe, Process

def func(conn):
    while True:
        msg = conn.recv()
        if msg is None: break
        print(msg)

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    p = Process(target=func, args=(conn1,))
    p.start()
    for i in range(10):
        conn2.send('约吧')
    conn2.send(None)

6.3 通过管道实现生产者消费者模型

# 管道实现消费者生产者模型
from multiprocessing import Process, Pipe

def consumer(p, name):
    produce, consume = p
    produce.close()
    while True:
        try:
            baozi = consume.recv()
            print('%s 收到包子:%s' % (name, baozi))
        except EOFError:
            break

def producer(seq, p):
    produce, consume = p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce, consume = Pipe()

    c1 = Process(target=consumer, args=((produce, consume), 'c1'))
    c1.start()

    seq = (i for i in range(10))
    producer(seq, (produce, consume))

    produce.close()
    consume.close()

    c1.join()
    print('主进程')

关于管道会造成数据不安全问题的官方解释:

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
    
由Pipe方法返回的两个连接对象表示管道的两端。每个连接对象都有send和recv方法(除其他之外)。注意,如果两个进程(或线程)试图同时从管道的同一端读取或写入数据,那么管道中的数据可能会损坏。当然,在使用管道的不同端部的过程中不存在损坏风险。

多个消费者竞争会出现数据不安全的问题的解决方案- 加锁:

from multiprocessing import Process, Pipe, Lock


def consumer(p, name, lock):
    produce, consume = p
    produce.close()
    while True:
        lock.acquire()
        baozi = consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' % (name, baozi))
        else:
            consume.close()
            break


def producer(p, n):
    produce, consume = p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()


if __name__ == '__main__':
    produce, consume = Pipe()
    lock = Lock()
    c1 = Process(target=consumer, args=((produce, consume), 'c1', lock))
    c2 = Process(target=consumer, args=((produce, consume), 'c2', lock))
    p1 = Process(target=producer, args=((produce, consume), 10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主进程')

管道可以用于双工通信,通常利用在客户端/服务端中使用的请求/响应模型,或者远程过程调用,就可以使用管道编写与进程交互的程序,像前面将网络通信的时候,我们使用了一个叫subprocess的模块,里面有个参数是pipe管道,执行系统指令,并通过管道获取结果。

7.数据共享(了解)

展望未来,基于消息传递的并发编程是大势所趋

即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合

通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中

进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题,应该尽量避免使用本节所讲的共享数据的方式,以后我们会尝试使用数据库来解决进程之间的数据共享问题。

7.1 Manager模块介绍

Manager模块介绍:

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,不加锁就会出现错误的结果,进程不安全的,所以也需要加锁

7.2 Manager模块使用

from multiprocessing import Manager, Process, Lock

def work(d, lock):
    with lock:  # 不加锁而操作共享的数据,肯定会出现数据错乱
        d['count'] -= 1

if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({'count': 100})
        p_l = []
        for i in range(100):
            p = Process(target=work, args=(dic, lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

7.3 进程间通信总结

总结一下,进程之间的通信:队列、管道、数据共享也算

下面要讲的信号量和事件也相当于锁,也是全局的,所有进程都能拿到这些锁的状态,进程之间这些锁啊信号量啊事件啊等等的通信,其实底层还是socekt,只不过是基于文件的socket通信,而不是跟上面的数据共享啊空间共享啊之类的机制,我们之前学的是基于网络的socket通信,还记得socket的两个家族吗,一个文件的一个网络的,所以将来如果说这些锁之类的报错,可能你看到的就是类似于socket的错误,简单知道一下就可以啦~~~

工作中常用的是锁,信号量和事件不常用,但是信号量和事件面试的时候会问到

八、信号量(了解)

8.1 信号量介绍

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。

假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。

实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。

信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

8.2 信号量使用

from multiprocessing import Process, Semaphore
import time, random


def go_ktv(sem, user):
    sem.acquire()
    print('%s 占到一间ktv小屋' % user)
    time.sleep(random.randint(0, 3))  # 模拟每个人在ktv中待的时间不同
    sem.release()


if __name__ == '__main__':
    sem = Semaphore(4)
    p_l = []
    for i in range(13):
        p = Process(target=go_ktv, args=(sem, 'user%s' % i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

九、事件(了解)

9.1 事件介绍

python线程的事件(Event)用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为False
set:将“Flag”设置为True

9.2 事件方法的使用

# 事件方法的使用
from multiprocessing import Event

e = Event()  # 创建一个事件对象
print(e.is_set())  # is_set()查看一个事件的状态,默认为False,可通过set方法改为True
print('look here!')
# e.set()          #将is_set()的状态改为True。
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
# e.clear()        #将is_set()的状态改为False
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可通过set方法改为Tr
e.wait()  # 根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
print('give me!!')

# set和clear  修改事件的状态 set-->True   clear-->False
# is_set     用来查看一个事件的状态
# wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞

9.3 通过事件来模拟红绿灯

# 通过事件来模拟红绿灯示例
from multiprocessing import Process, Event
import time, random

def car(e, n):
    while True:
        if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
            print('33[31m红灯亮33[0m,car%s等着' % n)
            e.wait()  # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
            print('33[32m车%s 看见绿灯亮了33[0m' % n)
            time.sleep(random.randint(2, 4))
            if not e.is_set():  # 如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
                continue
            print('车开远了,car', n)
            break

# def police_car(e, n):
#     while True:
#         if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
#             print('33[31m红灯亮33[0m,car%s等着' % n)
#             e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿灯就闯红灯走了
#             if not e.is_set():
#                 print('33[33m红灯,警车先走33[0m,car %s' % n)
#             else:
#                 print('33[33;46m绿灯,警车走33[0m,car %s' % n)
#         break

def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->将is_set()的值设置为False
        else:
            e.set()  # ---->将is_set()的值设置为True
            print('***********', e.is_set())

if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p = Process(target=car, args=(e, i,))  # 创建10个进程控制10辆车
        time.sleep(random.randint(1, 3))  # 车不是一下子全过来
        p.start()

    # for i in range(5):
    #     p = Process(target=police_car, args=(e, i,))  # 创建5个进程控制5辆警车
    #     p.start()

    # 信号灯必须是单独的进程,因为它不管你车开到哪了,我就按照我红绿灯的规律来闪烁变换,对吧
    t = Process(target=traffic_lights, args=(e, 5))  # 创建一个进程控制红绿灯
    t.start()

    print('预备~~~~开始!!!')

以上是关于Python之进程 - multiprocessing模块的主要内容,如果未能解决你的问题,请参考以下文章

Python 之进程

python之进程

python之进程

python多进程之Pool

python之进程

Python 3 并发编程多进程之进程同步(锁)