python并发编程

Posted fqh202

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python并发编程相关的知识,希望对你有一定的参考价值。

操作系统基础

  • 操作系统的两大功能
    1. 封装好硬件复杂的接口,提供良好的抽象接口,运行应用程序只需要调用这些接口即可启动相应的硬件服务,例如启动暴风音影
      • 双击执行文件
      • 获取应用软件在硬盘上的存储地址
      • 操作系统会直接将硬盘上的数据读取到内存中
      • 交给cpu运行
    2. 管理、调度进程,并且将多个进程对硬件的竞争变得有序;
  • 系统演化

    • 第二代计算机:批处理系统
      • 1701主要是负责I/O操作,批量输入和批量输出
      • 7094主要是负责计算, 运行程序
    • 第三代计算机: 多个联机终端 + 多道技术(针对单核

      • 多道技术:多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(cpu)的有序调度问题,解决方式为多路复用,复用分为时间上的复用和空间上的复用

        1. 时间上的复用:

          当一个程序在等待I/O时,另一个程序可以使用cpu,如果内存中可以存放足够多的作业,则cpu利用率可以达到100%

          操作系统采用了多道技术后,可以控制进程的切换,或者说进程之间去争抢cpu的执行权限。这种切换不仅会在一个进程遇到io时进行,一个进程占用cpu时间过长也会切换,或者说被操作系统夺走cpu的执行权限

        2. 空间上的复用

          程序之间的内存必须分割,这种分割需要在硬件层面实现,由操作系统控制。如果内存彼此不分割,则一个程序可以访问另外一个程序的内存,

          首先丧失的是安全性,比如你的qq程序可以访问操作系统的内存,这意味着你的qq可以拿到操作系统的所有权限。

          其次丧失的是稳定性,某个程序崩溃时有可能把别的程序的内存也给回收了,比方说把操作系统的内存给回收了,则操作系统崩溃。


开启子进程

  • 方法1(基础版本)
from multiprocessing import Process
import time


def task(name):
    print(%s starts running‘ % name)
    time.sleep(3)
    print(%s finish‘ % name)

if __name__ == ‘__main__‘:
    p = Process(target=task, kwargs={‘name‘:‘子进程1‘})
    p.start()
    print(‘主进程‘)

join方法

  • 在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源。

  • join作用 :如果主进程的任务在执行到某个阶段时,需要等待子进程执行完毕后才能继续执行,就需要有一种机制能让主进程检测子进程是否运行完毕,在子进程执行完毕后才能继续执行, 否则一直在原地阻塞


查看子进程

  • 可以通过os.getpid()查看当前进程代号, os.getppid()查看父进程代号;
from multiprocessing import Process
import time
import os


def task():
    print(%s is running, parent id <%s>‘ % (os.getpid(), os.getppid()))
    time.sleep(4)
    print(%s finish, parent id <%s>‘ % (os.getpid(), os.getppid()))
    print("==========")

if __name__ == ‘__main__‘:
    p = Process(target=task)
    p.start()
    print(‘主进程<%s> is running, 我的父进程<%s>‘ % (os.getpid(), os.getppid()))
  • 方法2:(升级版本)
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        """方法名称必须为run"""
        print(%s starts running‘ % self.name)
        time.sleep(3)
        print(%s finish‘ % self.name)


if __name__ == ‘__main__‘:
    p = MyProcess(‘子进程1‘)
    p.start()  # 会调用run方法
    print(‘主进程!‘)

子进程和父进程是存储在不同的内存空间的

  • 验证实例
from multiprocessing import Process
import time
import os

def task():
    print(%s is running, parent id <%s>‘ % (os.getpid(), os.getppid()))
    time.sleep(4)
    print(%s finish, parent id <%s>‘ % (os.getpid(), os.getppid()))
    print("==========")

if __name__ == ‘__main__‘:
    p = Process(target=task)
    p.start()
    print(‘主进程<%s> is running, 我的父进程<%s>‘ % (os.getpid(), os.getppid()))

socket多进程代码

# server服务器
import socket
from multiprocessing import Process
import os

def run(conn):
    """开始等待,接收数据"""
    while True:
        data = conn.recv(1024)
        if not data: break
        res = ‘来自服务端<%s>的反馈:%s %(os.getpid(), data)
        conn.send(res.encode(‘utf-8‘))

def server(ip, port):
    """启动服务器模版,并生成服务器"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, addr = server.accept()
        # 开启子进程服务,可以同时服务多个客户端
        p = Process(target=run, args=(conn,))
        p.start()

server(‘127.0.0.1‘, 8800)
# 客户端
import socket
import time
import time


client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((‘127.0.0.1‘, 8800))
msg = ‘hello from fqh‘
client.send(msg.encode((‘utf8‘)))

data = client.recv(1024)
print(data.decode(‘utf-8‘))
time.sleep(5)

守护进程

  • 守护进程在主进程代码执行完之后会立即断开
  • 设置守护进程方法:p.daemon=True, 且必须在p.start()之前设置
from multiprocessing import Process
import time

def task(name):
    print(%s starts‘ % name)
    time.sleep(2)
    print(%s ends‘ % name)

if __name__ == ‘__main__‘:
    p=Process(target=task,args=(‘egon‘,))
    # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.daemon=True
    p.start()
    time.sleep(1)
    # 只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了
    print(‘主‘)

互斥锁

  • 只需要将对共享数据修改的那一段代码加上锁就可以,其余的部分还是可以并行!

  • 模拟购票系统, 运用互锁功能实现抢票,代码如下:

from multiprocessing import Process, Lock
import time,json


def search(name):
    """查询剩余的票数"""
    dic=json.load(open(‘db.txt‘))
    time.sleep(1)
    print(\033[43m%s 查到剩余票数%s\033[0m‘ %(name,dic[‘count‘]))


def get(name):
    """购票"""
    dic=json.load(open(‘db.txt‘))
    time.sleep(1) #模拟读数据的网络延迟
    if dic[‘count‘] >0:
        dic[‘count‘]-=1
        time.sleep(1) #模拟写数据的网络延迟
        json.dump(dic,open(‘db.txt‘,‘w‘))
        print(\033[46m%s 购票成功!\033[0m‘ %name)
    else:
        print(\033[46m%s 对不起,抢票失败!\033[0m‘ %name)


def task(name, mutex):
    """购票流程"""
    search(name)
    mutex.acquire()
    get(name)
    mutex.release()

if __name__ == ‘__main__‘:
    # 主进程,模拟并发10个客户端抢票
    mutex = Lock()
    for i in range(10):
        name=‘<路人%s>‘ % i
        p=Process(target=task,args=(name, mutex))
        p.start()

队列Queue

  • Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

  • maxsize是队列中允许最大项数,省略则无大小限制。但需要明确:
    1. 存放的是消息而非大数据
    2. 用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
  • q.put()方法用于插入数据,q.get()从队列读取并删除一个元素

  • 在存储数据时候, 若队列已经满了再存入数据就会出现进程阻塞状态, 可以用q.full()判断队列是否为存满数据

  • 同样,在取出数据时候,若队列为空时,取数据也会出现进程阻塞状态, 可以用q.empty()判断队列是否为空状态


生产消费者模式

  • 简单版(单生产者及消费者模式)
from multiprocessing import Process,Queue
import time,random,os

def producer(q, name, food):
    """生产者"""
    for i in range(3):
        time.sleep(1)
        res = %s%s %(food, i)
        q.put(res)
        print(\033[45m%s 生产了 %s\033[0m‘ %(name,res))

def consumer(q, name):
    """消费者"""
    while True:
        res = q.get()
        if res == None:  # 判断是否生产完毕
            print(‘序列已经清空!‘)
            break
        time.sleep(2)
        print(\033[43m%s%s\033[0m‘ % (name, res))

if __name__ == ‘__main__‘:
    q = Queue()  # 生成容器对象
    # 生产者们
    p1 = Process(target=producer,args=(q, ‘egon‘, ‘包子‘))
    # 消费者们
    c1 = Process(target=consumer,args=(q, ‘alex‘))
    # 开始
    p1.start()
    c1.start()
    p1.join()
    q.put(None)  # 当生产进程执行完毕后再放进None表示结束
    print(‘主进程执行结束====‘)
  • 升级版(多消费者多生产者模式)
from multiprocessing import Process,Queue
import time,random,os


def producer(q, name, food):
    """生产者"""
    for i in range(3):
        time.sleep(1)
        res = %s%s %(food, i)
        q.put(res)
        print(\033[45m%s 生产了 %s\033[0m‘ %(name,res))


def consumer(q, name):
    """消费者"""
    while True:
        res = q.get()
        if res == "_a_":  # 判断是否生产完毕
            print(%s结束消费‘%name)
            break
        time.sleep(2)
        print(\033[43m%s%s\033[0m‘ % (name, res))


if __name__ == ‘__main__‘:
    q = Queue()  # 生成容器对象
    # 生产者们
    p1 = Process(target=producer, args=(q, ‘egon‘, ‘包子‘))
    p2 = Process(target=producer, args=(q, ‘kate‘, ‘袜子‘))
    p3 = Process(target=producer, args=(q, ‘steven‘, ‘iphone‘))
    # 消费者们
    c1 = Process(target=consumer, args=(q, ‘alex‘))
    c2 = Process(target=consumer, args=(q, ‘frank‘))
    # 生产者开始生产
    p1.start()
    p2.start()
    p3.start()
    # 消费者开始消费
    c1.start()
    c2.start()
    # 等待生产完毕后添加结束生产标识符, 否则消费者进程会阻塞主进程,
    # 注意有几个消费者就要添加几个结束标识符,必须结束所有的消费进程,否则依然阻塞
    p1.join()
    p3.join()
    p2.join()
    q.put(‘_a_‘)
    q.put(‘_a_‘)
    print(‘主进程‘)

  • 最高级版本, 利用joinable_queue和守护进程
from multiprocessing import Process, JoinableQueue
import time


def producer(q, name, food):
    """生产者"""
    for i in range(3):
        time.sleep(1)
        res = %s%s %(food, i)
        q.put(res)
        print(\033[45m%s 生产了 %s\033[0m‘ %(name,res))
    q.join()  # 当q为空的时候才会继续主进程


def consumer(q, name):
    """消费者"""
    while True:
        res = q.get()
        if res == "_a_":  # 判断是否生产完毕
            print(%s结束消费‘%name)
            break
        time.sleep(2)
        print(\033[43m%s%s\033[0m‘ % (name, res))
        q.task_done()  # 每取走一个数据都会向生产者反馈信息


if __name__ == ‘__main__‘:
    q = JoinableQueue()  # 生成容器对象
    # 生产者们
    p1 = Process(target=producer, args=(q, ‘egon‘, ‘包子‘))
    p2 = Process(target=producer, args=(q, ‘kate‘, ‘袜子‘))
    p3 = Process(target=producer, args=(q, ‘steven‘, ‘iphone‘))
    # 消费者们
    c1 = Process(target=consumer, args=(q, ‘alex‘))
    c2 = Process(target=consumer, args=(q, ‘frank‘))
    # 设置消费者进程为守护进程,当主程序执行完毕会自动回收
    c1.daemon = True
    c2.daemon = True
    # 生产者开始生产
    p1.start()
    p2.start()
    p3.start()
    # 消费者开始消费
    c1.start()
    c2.start()
    # 等待生产者进程结束
    p1.join()
    p3.join()
    p2.join()
    # 此时队列一定为空
    print(‘主进程‘)

以上是关于python并发编程的主要内容,如果未能解决你的问题,请参考以下文章

golang代码片段(摘抄)

《java并发编程实战》

python中的多线程和多进程编程

Java并发编程实战 04死锁了怎么办?

Java并发编程实战 04死锁了怎么办?

Python:并发编程