并发编程之多进程2

Posted liuxiaolu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之多进程2相关的知识,希望对你有一定的参考价值。

一:multiprocessing模块介绍

  用来开启子进程,并在子进程中执行定制的任务(比如函数)。该模块功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue|Pipe、Lock等组件。

  需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

二:Process类的介绍 

技术分享图片
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
1 group参数未使用,值始终为None
2 
3 target表示调用对象,即子进程要执行的任务
4 
5 args表示调用对象的位置参数元组,args=(1,2,egon,)
6 
7 kwargs表示调用对象的字典,kwargs={name:egon,age:18}
8 
9 name为子进程的名称

方法介绍:
 1 p.start():启动进程,并调用该子进程中的p.run() 
 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
 3  p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
 4 p.is_alive():如果p仍然运行,返回True
 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 

属性介绍:
1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 
2 p.name:进程的名称
3 p.pid:进程的pid
4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
具体介绍

三:僵尸进程与孤儿进程

  1.僵尸进程(有害):一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。

  简单来说就是指子进程执行完所有任务,已经终止了,但是还残留一些信息(进程id,进程名),但是父进程没有去处理这些残留信息,导致残留信息占用系统资源。

  当出现大量的僵尸进程时,会占用系统资源,可以把它父进程杀掉,僵尸就成了孤儿,操作系统会负责回收。

技术分享图片
import  time
from multiprocessing import  Process
def task1():
    print("子进程 run")

if __name__ == __main__:
    for i in range(10):
        p = Process(target=task1)
        p.start()

    time.sleep(100000)
僵尸进程

  2.孤儿进程(无害):一个父进程结束,而它的一个或者多个子进程还在运行,那么那些进程将成为孤儿进程。孤儿进程将被init进程所收养,并且由init进程对它们完成状态收集工作。

技术分享图片
import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print im father, pid, pid, ppid, ppid
pid = os.fork()
#执行pid=os.fork()则会生成一个子进程
#返回值pid有两种值:
#    如果返回的pid值为0,表示在子进程当中
#    如果返回的pid值>0,表示在父进程当中
if pid > 0:
    print father died..
    sys.exit(0)

# 保证主线程退出完毕
time.sleep(1)
print im child, os.getpid(), os.getppid()

执行文件,输出结果:
im father pid 32515 ppid 32015
father died..
im child 32516 1
孤儿进程

四:守护进程

  主进程创建守护进程:

    其一:守护进程会在主进程代码执行完毕后就终止

    其二:守护进程内无法再开启子进程,否则抛异常(AssertionError: daemonic processes are not allowed to have children)

注意:进程之间是相互独立的,主进程代码运行结束,守护进程随机终止

技术分享图片
from multiprocessing import Process
import time

def task():
    print(小主的一生)
    time.sleep(2)
    print(小主凉了)   #守护进程运行的话,此行代码就不会运行

if __name__ == __main__:
    xiaozhu=Process(target=task)
    xiaozhu.daemon=True    #守护进程,默认为False,意味着不守护,改为True表示是守护进程
    xiaozhu.start()
    print(皇帝登基)
    time.sleep(1)
    print(hello)
    print(皇帝薨)
守护进程

五:进程同步

  进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端是没有问题的。但是共享带来的就是竞争,竞争带来的结果就是错乱,如何控制,加锁处理。

  使用锁将需要共享的数据加锁,在执行代码之前会先判断这个值。要注意:在使用锁时,必须保证锁是同一个。

  互斥锁:保证了每次只有一个进程进入改都拿程序的操作,从而保证了多进程情况下数据的正确性。

技术分享图片
from multiprocessing import Process,Lock
import random
import time

def task1(lock):
    lock.acquire()
    print(hello  1)
    print(1 How are you!)
    time.sleep(random.randint(1,2))
    print(bye  1)
    lock.release()

def task2(lock):
    lock.acquire()
    print(hello 2)
    print(2 How are you!)
    print(bye 2)
    lock.release()

def task3(lock):
    lock.acquire()
    print(hello 3)
    print(3 How are you!)
    print(bye 3)
    lock.release()

if __name__ == __main__:
    lock=Lock()
    p1=Process(target=task1,args=(lock,))
    p1.start()
    p2=Process(target=task2,args=(lock,))
    p2.start()
    p3=Process(target=task3,args=(lock,))
    p3.start()
加锁
技术分享图片
import json
from multiprocessing import Process,Lock
import time
import random

"""
join和锁的区别
1.join中顺序是固定的  不公平  
2.join是完全串行  而 锁可以使部分代码串行 其他代码还是并发 

"""

# 查看剩余票数
def check_ticket(usr):
    time.sleep(random.randint(1,3))
    with open("ticket.json","r",encoding="utf-8") as f:
        dic = json.load(f)
        print("%s查看 剩余票数:%s" % (usr,dic["count"]))

def buy_ticket(usr):
    with open("ticket.json","r",encoding="utf-8") as f:
        dic = json.load(f)
        if dic["count"] > 0:
            time.sleep(random.randint(1,3))
            dic["count"] -= 1
            with open("ticket.json", "w", encoding="utf-8") as f2:
                json.dump(dic,f2)
                print("%s 购票成功!" % usr)

def task(usr,lock):

    check_ticket(usr)
    lock.acquire()
    buy_ticket(usr)
    lock.release()

if __name__ == __main__:
    lock = Lock()

    for i in range(10):
        p = Process(target=task,args=("用户%s" % i,lock))
        p.start()
        #p.join() # 只有第一个整个必须完毕 别人才能买 这是不公平的
互斥锁抢票

  加锁可以保证多个进程修改同一个数据时,同一时间只能有一个任务可以进行修改,即串行的修改,降低了速度,但是保证了数据安全。

  死锁:指的是锁无法打开导致程序卡死,首先要明确有一把锁的时候是不会卡死的,正常开发时一把锁足够使用,不要打开多把锁。

技术分享图片
from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
    l1.acquire()
    print("盘子被%s抢走了" % i)
    time.sleep(1)

    l2.acquire()
    print("筷子被%s抢走了" % i)
    print("吃饭..")
    l1.release()
    l2.release()
    pass

def task2(l1,l2,i):

    l2.acquire()
    print("筷子被%s抢走了" % i)

    l1.acquire()
    print("盘子被%s抢走了" % i)

    print("吃饭..")
    l1.release()
    l2.release()


if __name__ == __main__:
    l1 = Lock()
    l2 = Lock()
    Process(target=task1,args=(l1,l2,1)).start()
    Process(target=task2,args=(l1,l2,2)).start()
=============================================
运行结果卡住>>>:
盘子被1抢走了
筷子被2抢走了
死锁

六:IPC(进程间的通讯)

  由于进程之间的内存都是相互独立的,所以需要对应的解决方案,能够使得进程之间可以相互传递数据。

  有三种方案:

    1.使用共享文件,多个进程同时读写同一个文件(I/O速度慢,传输数据大小不受限制)

    2.管道是基于内存的,速度快,但是是单向的,用起来麻烦(了解 ) 

    3.申请共享内存空间,多个进程可以共享这个内存区域(重点)

      速度快,但是数据量不能太大

技术分享图片
from multiprocessing import Manager,Process,Lock
def work(d):
    # with lock:
        d[count]-=1

if __name__ == __main__:

    with Manager() as m:
        dic=m.dict({count:100}) #创建一个共享的字典
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,))
            p_l.append(p)
            p.start()

        for p in p_l:
            p.join()
        print(dic)
IPC

七:队列(推荐使用)

  进程之间彼此隔离,要实现进程间通信,multiprocessing模块支持两种形式:队列和管道。

  队列特点:先进先出

    优点:可以保证数据不会错乱,即使在多进程下,因为其put和get默认都是阻塞的。

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。 
技术分享图片
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3 q.get_nowait():同q.get(False)
4 q.put_nowait():同q.put(False)
5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
7 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
主要方法介绍
技术分享图片
from multiprocessing import Queue
# 例1:
# q = Queue(1)  # 创建一个队列 最多可以存一个数据
#
# q.put("张三")
# print(q.get())
#
# q.put("李四") # put默认会阻塞 当容器中已经装满了
#
# print(q.get())
# print(q.get()) # get默认会阻塞 当容器中已经没有数据了
#
# print("over")

# 例2:
q = Queue(1)  # 创建一个队列 最多可以存一个数据
#
q.put("张三")
# q.put("李四",False) # 第二个参数 设置为False表示不会阻塞 无论容器是满了 都会强行塞 如果满了就抛异常

print(q.get())
print(q.get(timeout=3)) # timeout 仅用于阻塞时

# q.put("李四") # put默认会阻塞 当容器中已经装满了
#
# print(q.get())
# print(q.get()) # get默认会阻塞 当容器中已经没有数据了
#
# print("over")
队列

八:生产者消费者模型

  1.什么是生产者消费者模型?

  生产者消费者模式是通过一个容器来解决生产者与消费者的强耦合问题,生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者与消费者的处理能力。

技术分享图片
import random
from multiprocessing import Process,Queue
import time
# 爬数据
def get_data(q):

    for num in range(5):
        print("正在爬取第%s个数据" % num)
        time.sleep(random.randint(1,2))
        print("第%s个数据 爬取完成" % num)
        # 把数据装到队列中
        q.put("第%s个数据" % num)

def parse_data(q):
    for num in range(5):
        # 取出数据
        data = q.get()
        print("正在解析%s" % data)
        time.sleep(random.randint(1, 2))
        print("%s 解析完成" % data)

if __name__ == __main__:
    # 共享数据容器
    q = Queue(5)
    #生产者进程
    produce =  Process(target=get_data,args=(q,))
    produce.start()
    #消费者进程
    customer = Process(target=parse_data,args=(q,))
    customer.start()
生产者消费者模型
技术分享图片
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(33[45m%s 吃 %s33[0m %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=包子%s %i
        q.put(res)
        print(33[44m%s 生产了 %s33[0m %(os.getpid(),res))

if __name__ == __main__:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print()
例二

总结:程序中有两类角色

    一类负责生产数据(生产者)

    一类负责处理数据(消费者)

解决的问题:平衡生产者与消费者之间的工作能力,从而提高程序整体出口ishuju的速度。

 

以上是关于并发编程之多进程2的主要内容,如果未能解决你的问题,请参考以下文章

并发编程之多进程2

python并发编程之多进程

并发编程之多进程

并发编程之多进程

python并发编程之多线程基础知识点

并发编程之多进程