并发编程

Posted daizongqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程相关的知识,希望对你有一定的参考价值。

操作系统介绍

计算机三大组成:应用程序、操作系统、硬件。

执行程序结构:硬盘、内存、CPU。

操作系统:协调、管理和控制计算机软硬件资源的控制程序。

操作系统作用:

  • 隐藏复杂的硬件接口,提供良好的抽象接口。
  • 管理、调度接口,并且将多个进程对硬件的竞争变得有序。

第一代计算机:真空管和穿孔卡片

特点:

  • 没有操作系统的概念
  • 所有程序设计都是直接操控硬件

优点:程序员独占资源,即时调试程序

缺点:浪费计算机资源,一个时间段只有一个人使用

第二代计算机:晶体管和批处理系统

技术图片

特点:

  • 各人员和计算机明确分工
  • 有了操作系统概念

优点:批处理节省机时

缺点:

  • 需要人参与,将磁带搬来搬去
  • 计算过程仍是串行
  • 不利于调试

第三代计算机:集成电路芯片和多道技术

多道技术:时间多路复用和空间多路复用+硬件上(物理层)支持隔离,以达到将一个单独的CPU变成多个虚拟的CPU。

  • 产生背景:针对单核,实现并发。现在的主机一般是多核,那么每个核都会利用多道技术,有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个cpu中的任意一个,具体由操作系统调度算法决定。
  • 空间多路复用:内存中同时有多道程序
  • 时间多路复用:复用一个CPU的时间片。但遇到IO切换(提升效率)或占用CPU时间过长(降低效率)也切换,核心在于切换之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行。

分时操作系统:多个联机终端+多道技术

多进程

进程理论

进程:一个程序的执行过程,而负责执行任务是CPU。

并发:伪并行,即看起来是同时运行,单个CPU+多道技术就可实现。

串行:一个进程执行完才执行下一个。

并行:同时运行,只有具备多个CPU才能实现。

阻塞:进程在等待输入(即I/O)时的状态。

进程的状态:运行、阻塞、就绪。

句柄:在创建进程时,父进程得到一个特别的令牌,该令牌可控制子进程,但是父进程有权把该句柄传给其他子进程。

进程的层次结构:无论UNIX还是windows,进程只有一个父进程,不同的是

  • 在UNIX中所有的进程,都是以init进程为根,组成树形结构。
  • 在windows中,没有进程层次的概念,所有的进程地位相同。

开启子进程的两种方式

# 方式一:利用Process类
from multiprocessing import Process
import time
import os
def task(name):
    print(f'name is running,子进程号:os.getpid(),子父进程号:os.getppid()')
    time.sleep(2)
    print(f'name is done')
if __name__ == '__main__':
    p = Process(target=task, args=('allen',))  # 实例化
    p.start()  # 仅仅给操作系统发送一个开启子进程信号
    print(f'主,主进程号:os.getpid(),主父进程号:os.getppid()')
# 利用终端查看pycharm进程号
# E:\\练习>tasklist |findstr pycharm
    
# 方式二:继承Process类
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self, name):  # 重写init方法
        super().__init__()  # 继承父类init方法
        self.name = name
    def run(self) -> None:  # 固定写法
        print(f'self.name is running')
        time.sleep(2)
        print(f'self.name is done')
if __name__ == '__main__':
    p = MyProcess('allen')  # 实例化
    p.start()  # 仅仅给操作系统发送一个开启子进程信号
    print('主')

僵尸进程和孤儿进程

僵尸进程:当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程。

  • 危害:导致系统不能产生新的进程,因pid号有限。
  • 解决方式:可用kill-SIGKILL父进程ID来解决。

孤儿进程:当父进程退出,而它的一个或多个子进程还在运行,那么子进程将成为孤儿进程。

进程操作

开启多进程(multiprocess.process)

from multiprocessing import Process
import time
def task(name):
    print(f'name is running')
    time.sleep(2)
    print(f'name is done')
if __name__ == '__main__':
    p1 = Process(target=task, args=('allen',), name='子进程1')  # 让关键参数来指定进程名
    p2 = Process(target=task, args=('nick',))
    p3 = Process(target=task, args=('tank',))
    p_list = [p1, p2, p3]
    for p in p_list:
        p.start()
    p1.terminate()  # 给操作系统发信号关闭进程,不会立即关闭
    for p in p_list:
        p.join()  # 让主进程等待p的结束,这是并发并非串行
    print(p1.is_alive())  # 判断子进程是否存活
    print('主')
    print(p1.pid)  # 查看子进程号
    print(p1.name)  # 查看子进程名

基于多进程实现并发的套接字通信

# server.py
import socket
from multiprocessing import Process
def talk(conn):  # 通信循环函数
    while True:
        try:
            data = conn.recv(1024)
            print(data)
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
def server(ip, port):  # 连接循环函数
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        p = Process(target=talk, args=(conn,))  # 实例化进程对象
        p.start()  # 开启进程
    server.close()
if __name__ == '__main__':
    server('127.0.0.1', 8000)
    
# client.py
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8000))
while True:
    msg = input('>>>:').strip()
    if not msg: continue
    client.send(bytes(msg, encoding='utf-8'))
    data = client.recv(1024)
    print(data)
# 每来一个客户端,服务端就开启一个新的进程来服务它,这种实现方式导致系统越来越卡。
# 进程之间的内存空间是隔离的。

守护进程

定义:主进程创建子进程,然后将该进程设置成守护自己的进程。守护进程会在主进程代码执行结束后就终止。

from multiprocessing import Process
import time
def talk(name):
    print(f'name is running')
    time.sleep(2)
    print(f'name is done')
if __name__ == '__main__':
    p = Process(target=talk, kwargs='name': 'allen')
    p.daemon = True  # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    print('主')  # 主 #只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了

进程同步(multiprocess.Lock)

互斥锁

由来:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱。

# 并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import time, os
def task():
    print(f'os.getpid() is running')
    time.sleep(1)
    print(f'os.getpid() is done')
if __name__ == '__main__':
    for i in range(3):
        p = Process(target=task)
        p.start()

解决方案:加锁处理。互斥锁的工作原理就是把并发改成串行,降低了效率,但保证了数据安全不错乱。如果把多个进程比喻为多个人,多个人都要去争抢同一个资源:卫生间,一个人抢到卫生间后上一把锁,其他人都要等着,等到这个完成任务后释放锁。

# 由并发变成了串行,牺牲了运行效率,但避免了竞争
from multiprocessing import Process, Lock
import time, os
def task(lock):
    lock.acquire()  # 加锁
    print(f'os.getpid() is running')
    time.sleep(1)
    print(f'os.getpid() is done')
    lock.release()  # 释放锁
if __name__ == '__main__':
    lock = Lock()  # 锁对象
    for i in range(3):
        p = Process(target=task, args=(lock,))
        p.start()

模拟抢票

方案一:多个进程共享同一文件,我们可以把文件当数据库,用多个进程模拟多个人执行抢票任务,并发运行,效率高,但竞争写同一文件,数据写入错乱,只有一张票,卖成功给了10个人,故不成立。

#文件db.txt的内容为:"count":1
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process
import time
import json
def search(name):
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    time.sleep(1)  # 模拟网络延迟
    print(f'路人name查询了还有dic["count"]张余票')
def get(name):
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    time.sleep(1)
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(1)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print(f'name抢票成功')
def task(name):
    search(name)
    get(name)
if __name__ == '__main__':
    for i in range(8):
        p = Process(target=task, args=(f'路人i',))
        p.start()

方案二:加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

from multiprocessing import Process, Lock
import time
import json
def search(name):
    """查票函数"""
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    time.sleep(1)  # 模拟网络延迟
    print(f'路人name查询了还有dic["count"]张余票')
def get(name):
    """抢票函数"""
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    time.sleep(1)
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(1)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print(f'name抢票成功')
def task(name, lock):
    search(name)
    lock.acquire()  # 加锁
    get(name)
    lock.release()  # 释放锁
if __name__ == '__main__':
    lock = Lock()  # 在抢票环节加锁,而非查票加锁
    for i in range(8):
        p = Process(target=task, args=(f'路人i', lock))
        p.start()

方案三:join方法处理,虽然保证了数据安全,但问题是连查票操作也变成只能一个一个人去查了,很明显大家查票时应该是并发地去查询而无需考虑数据准确与否。

from multiprocessing import Process, Lock
import time
import json
def search(name):
    """查票函数"""
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    time.sleep(1)  # 模拟网络延迟
    print(f'路人name查询了还有dic["count"]张余票')
def get(name):
    """抢票函数"""
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    time.sleep(1)
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(1)
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print(f'name抢票成功')
def task(name):
    search(name)
    get(name)
if __name__ == '__main__':
    for i in range(8):
        p = Process(target=task, args=(f'路人i', ))
        p.start()
        p.join()

综上所述:join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行。

def task(name,):
    search(name) # 并发执行
    lock.acquire()
    get(name) #串行执行
    lock.release()

虽然可以用文件共享数据实现进程间通信,但问题是:效率低(共享数据基于文件,而文件是硬盘上的数据)和需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:效率高(多个进程共享一块内存的数据)和帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC(inter-process-communication)通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

进程间通信IPC(multiprocess.Queue)

队列Queue

定义:可创建共享的进程队列,可保证多进程安全的队列,实现多进程之间的数据传递。

from multiprocessing import Queue
q = Queue(3)  # 创建共享的进程队列
q.put(1)  # 插入数据到队列中
q.put('hello')
q.put([1, 2, 3])
print(q.full())  # True
# q.put(4)  # 再放就阻塞了
q.get()  # 从队列读取并且删除一个元素
q.get()
q.get()
print(q.empty())  # True
q.get()  # 再取就阻塞了

生产者和消费者模型

定义:生产者是生产数据的任务,消费者是处理数据的任务,而生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。

# 基于Queue实现消费者和生产者模型
from multiprocessing import Process, Queue
import time
def producer(q):
    for i in range(3):
        res = f'包子i'
        time.sleep(1)
        print(f'生产者生产了res')
        q.put(res)
def costumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print(f'消费者吃了res包子')
if __name__ == '__main__':
    # 容器
    q = Queue()
    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    # 消费者们
    c1 = Process(target=costumer, args=(q,))
    p1.start()
    p2.start()
    c1.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)#有几个消费者就需要发送几次结束信号:相当low
    print('主')
# 基于JoinableQueue实现生产者消费者模型
from multiprocessing import Process, JoinableQueue
import time
def producer(q):
    for i in range(3):
        res = f'包子i'
        time.sleep(1)
        print(f'生产者生产了res')
        q.put(res)
    q.join()  # 等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束
def costumer(q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print(f'消费者吃了res包子')
        q.task_done()  # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了
if __name__ == '__main__':
    # 容器
    q = JoinableQueue()
    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    # 消费者们
    c1 = Process(target=costumer, args=(q,))
    c1.daemon = True
    p1.start()
    p2.start()
    c1.start()
    p1.join()
    p2.join()
    # 1、主进程等生产者p1、p2结束
    # 2、而p1、p2是在消费者把所有数据都取干净之后才会结束
    # 3、所以一旦p1、p2结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程
    print('主')

生产者消费者模型总结:

  • 程序中有两类角色
    • 一类负责生产数据(生产者)
    • 一类负责处理数据(消费者)
  • 引入生产者消费者模型为了解决的问题是
    • 平衡生产者与消费者之间的速度差
    • 程序解开耦合
  • 如何实现生产者消费者模型
    • 生产者<--->队列<--->消费者

以上是关于并发编程的主要内容,如果未能解决你的问题,请参考以下文章

Go语言学习之旅--并发编程

并发编程路线

java并发编程看啥书比较好

JAVA并发编程:并发编程的认识

并发编程的基础

Java并发编程之美