并发编程之多进程

Posted zrxu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之多进程相关的知识,希望对你有一定的参考价值。

进程:
进程是正在进行的一个过程或者一个任务。负责执行这个任务的是cpu

进程与程序的区别:
程序:仅仅只是一堆代码
进程:是程序运行的过程

并发与并行
并发只能达到伪并行,看上去是同时进行的,其实并不是,单个cpu+多道技术就可以实现并发
并行:是同时运行,必须具备多个cpu,才能实现并行的功能

进程的状态
其实在两种情况下会导致一个进程在逻辑上不能运行,
 1、进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作
 2、与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

 开启进程的两种方式
 

# 1、第一种方式
 from multiprocessing import Process,current_process


def task(name):
    print("%s is running, 进程名:%s" % (name, current_process().name))

if __name__ == ‘__main__‘:
    p = Process(target=task, args=(1,) ,name="子进程")
    p.start() # 向操作系统发送一个开启一个进程的请求

    print("主")
‘‘‘
打印结果:
主
1 is running, 进程名:子进程
‘‘‘

 

# 2、第二种方式
class MyProcess(Process):
    def __init__(self, process, name):
        super().__init__()
        self.process = process
        self.name = name

    def run(self): # 将需要调用进程的方法写入run中, 函数名run不能修改
        print("%s is running, 进程名:%s" % (self.process, current_process().name))


if __name__ == ‘__main__‘:
    p = MyProcess("xu","子进程")
    p.start()
    print("主")

‘‘‘
打印结果:
主
1 is running, 进程名:子进程
‘‘‘

 
进程的join方法:

# join方法
from multiprocessing import Process, current_process
import time


def task(name):
    print("%s is running, 进程名:%s" % (name, current_process().name))
    time.sleep(1)
‘‘‘
执行结果:
1 is running, 进程名:子进程1
1 is end, 进程名:子进程1
主
‘‘‘

进程的p.join()方法是当p进程结束之后,才会执行join方法后面的程序

# Process的其他属性和方法

1、terminate和is_alive
from multiprocessing import Process
import time
import random

def task(name):
    print(‘%s is piaoing‘ %name)
    time.sleep(random.randrange(1,5))
    print(‘%s is piao end‘ %name)

if __name__ == ‘__main__‘:
    p1=Process(target=task,args=(‘egon‘,))
    p1.start()

    p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    print(p1.is_alive()) #结果为True

    # is_alive()为判断进程是否还存活

    print(‘主‘)
    print(p1.is_alive()) # 此时p1进程已经关闭,所以结果为False

‘‘‘
执行结果:
True
主
False
‘‘‘
2、name与pid
from multiprocessing import Process
import time
import random
import os

def task(name):
    print(‘%s is running‘ %name)
    time.sleep(random.randrange(1,5))
    print("父进程pid:%s, 子进程pid:%s" %(os.getppid(), os.getpid()))
    print(‘%s is end‘ % name)


if __name__ == ‘__main__‘:
    p1=Process(target=task,args=(‘xu‘,), name="子进程1") # name属性设置进程的名称
    p1.start()
    print(p1.name) # 打印p1进程的名称
    print(‘主‘, os.getpid())

‘‘‘
打印结果:
子进程1
主 15752
xu is running
父进程pid:15752, 子进程pid:15753
xu is end
‘‘‘


守护进程
1、守护进程会在主进程执行结束后结束。
2、守护进程内无法再开启子进程,否则会抛出异常。

from multiprocessing import Process,current_process
import time


def task():
    print("%s is running" % current_process().name)
    time.sleep(2)
    print("%s is end" % current_process().name)

if __name__ == ‘__main__‘:
    p1 = Process(target=task, name="进程1")
    p1.daemon = True # 开启守护进程,开启的动作必须要在start之前
    p1.start()

    print("主")  # 当此程序执行后,主进程便执行结束了,那么守护线程也需要结束

‘‘‘
打印结果:
主

‘‘‘

 
互斥锁
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱

from multiprocessing import Process, current_process
import time


def task():
    print("%s is running" % current_process().name)
    time.sleep(2)
    print("%s is end" % current_process().name)

if __name__ == ‘__main__‘:
    for i in range(3):
        p = Process(target=task, name="进程%s" %i)
        p.start()
‘‘‘
打印结果:
进程0 is running
进程1 is running
进程2 is running
进程0 is end
进程1 is end
进程2 is end
‘‘‘


如果没有发生错乱,那么应该打印结果为:
进程0 is running
进程0 is end
进程1 is running
进程1 is end
进程2 is running
进程2 is end
如何控制其行为呢,就需要加锁,而这把锁就为互斥锁

from multiprocessing import Process, current_process, Lock
import time


def task(mutex):
    mutex.acquire()
    print("%s is running" % current_process().name)
    time.sleep(2)
    print("%s is end" % current_process().name)
    mutex.release()

if __name__ == ‘__main__‘:
    mutex = Lock() # 创建互斥锁对象,需要保证每个进程里面的锁是同一把,所以需要将此对象传给task
    for i in range(3):
        p = Process(target=task, args=(mutex, ), name="进程%s" %i)
        p.start()
‘‘‘
打印结果:
进程0 is running
进程0 is end
进程1 is running
进程1 is end
进程2 is running
进程2 is end
‘‘‘


互斥锁模拟抢票功能
# 模拟抢票功能

from multiprocessing import Process, Lock
import time
import json

def show_ticket():
    time.sleep(1)
    ticket_dict = json.load(open("ticket_num.json", "r", encoding="utf-8"))
    print("票数为:%s" %(ticket_dict["count"]))


def pay_ticket(mutex):
    show_ticket()
    mutex.acquire()
    time.sleep(1)
    dict = json.load(open("ticket_num.json", "r", encoding="utf-8"))
    if dict["count"] > 0:
        print("购票成功")
        dict["count"] -= 1
        json.dump(dict, open("ticket_num.json", "w", encoding="utf-8"))
    else:
        print("没有票了, 购票失败")
    mutex.release()


if __name__ == ‘__main__‘:
    mutex = Lock()
    for i in range(10):
        p = Process(target=pay_ticket, args=(mutex, ))
        p.start()

 

队列
from multiprocessing import Process,Queue

q=Queue(3) # 3为队列中允许最大项数,省略则无大小限制

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1) # 在队列中放置数据
q.put(2)
q.put(3)
print(q.full()) #满了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了

ps:
1、队列内存放的是消息而非大数据
2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

# 两个队列形成一个栈
‘‘‘
队列特性:先进先出
堆栈特性:先进后出

实现思路:
# 存数据阶段
1、先创建两个队列
2、将数据存入第一个队列中
# 取数据阶段
1、判断第一个队列是否为空, 若为空,则返回None
2、将第一个队列中的数据与挨个取出存入第二个队列,当第一个队列中的数量为最后一个的时候,这最后一个数据便是最后存储进队列中的,将其取出,然后返回,然后将第一个队列和第二个队列对调,一直循环,这样就能形成先进后出

‘‘‘
import queue

class Stack(object,):
    def __init__(self):
        self.one_queue = queue.Queue()
        self.two_queue = queue.Queue()


    def push(self, data):
        """
        将数据放入第一个队列中
        :return:
        """
        self.one_queue.put(data)

    def getdata(self):
        """
        将数据取出
        :return:
        """
        if self.one_queue.qsize() == 0:
            return None

        while 1:
            if self.one_queue.qsize() == 1:
                value = self.one_queue.get()
                break
            self.two_queue.put(self.one_queue.get())
     # 将队列对换 temp
= self.one_queue self.one_queue = self.two_queue self.two_queue = temp return value if __name__ == __main__: stack = Stack() stack.push("xu") stack.push("sy") stack.push("songy") print(stack.getdata()) print(stack.getdata()) print(stack.getdata())


生产者消费者模型
用多进程和队列实现生产者消费者模型

from multiprocessing import Process, Queue
import time


def producer(name, q):
    for i in range(3):
        print("生产者%s生产了%s包子" %(name, i))
        q.put({name: i})
        time.sleep(0.5)


def consumer(name, q):
    while True:
        package = q.get()
        print("消费者%s吃了包子%s" %(name, package))
        time.sleep(1)

if __name__ == ‘__main__‘:
    q = Queue()
    producers = ["xu", "sy", "songyuan"]
    consumers = ["guang", "chong",]
    # 生产者们
    for one_producer in producers:
        p = Process(target=producer, args=(one_producer, q, ))
        p.start()

    # 消费者们

    for one_consumer in consumers:
        p = Process(target=consumer, args=(one_consumer, q, ))
        p.start()


这就产生了一个问题, 如果消费者将队列中的包子取完了, 那么消费者就会一直阻塞在q.get()阶段,无发退出。
所以就有了一个模块JoinableQueue

from multiprocessing import Process, JoinableQueue
import time


def producer(name, q):
    for i in range(3):
        print("生产者%s生产了%s包子" %(name, i))
        q.put({name: i})
        time.sleep(0.5)
    q.join() # q没取完, 则进程便会在此阻塞住

def consumer(name, q):
    while True:
        package = q.get()
        print("消费者%s吃了包子%s" %(name, package))
        time.sleep(1)
        q.task_done() # 给q.join()发送一个已经取走一个包子的消息

if __name__ == ‘__main__‘:
    q = JoinableQueue()
    producers = ["xu", "sy", "songyuan"]
    consumers = ["guang", "chong",]
    # 生产者们

    p1 = Process(target=producer, args=("xu", q, ))
    p2 = Process(target=producer, args=("sy", q, ))
    p3 = Process(target=producer, args=("songy", q, ))

    # 消费者们


    c1 = Process(target=consumer, args=("guang", q, ))
    c2 = Process(target=consumer, args=("chong", q, ))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

 



以上是关于并发编程之多进程的主要内容,如果未能解决你的问题,请参考以下文章

并发编程之多进程

并发编程之多进程

并发编程之多进程

并发编程之多进程理论

python并发编程之多线程

python并发编程之多进程