Python并发编程之进程的玩法

Posted Aspirantlu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python并发编程之进程的玩法相关的知识,希望对你有一定的参考价值。

一、操作系统基础

1. I/O操作

IO操作是相对内存来说的。输入指往内存中输入,输出指从内存中往外输出。

文件操作:read(输入),write(输出)

网络操作:send(输出),recv(输入)

函数:print(输出),input(输入)

2. 计算机的工作分为两个状态

CPU工作:做计算(对内存中的数据进行操作)的时候工作

CPU不工作:IO操作的时候不工作

3. 多道操作系统

将原来的顺序的一个一个执行的思路变成共同存在在一台计算机中,其中一个程序执行让出CPU之后,另一个程序能继续使用CPU,来提高CPU的利用率。

原理:一个程序遇到IO就把CPU让给其他程序。

单纯的切换会占用时间,但是多道操作系统的原理整体上还是节省了时间,提高了CPU的利用率。

形成了时空复用的概念。在同一个时间点上,多个程序同时执行,一块内存条上存储了多个进程的数据。

4. 分时操作系统

时间分为很小很小的段,每一个时间都是一个时间片。每一个程序轮流执行一个时间片的时间,自己的时间片到了就轮到下一个程序执行,这个过程称之为时间片的轮转

【注意】分时操作系统没有提高CPU的利用率,但是提高了用户的体验。

5. 实时操作系统

系统能够及时响应随机发生的外部事件,并在严格的时间范围内完成对该事件的处理。

实时系统在一个特定的应用中常作为一种控制设备来使用。

实时系统可分成两类:

(1)实时控制系统。当用于飞机飞行、导弹发射等的自动控制时,要求计算机能尽快处理测量系统测得的数据,及时地对飞机或导弹进行控制,或将有关信息通过显示终端提供给决策人员。当用于轧钢、石化等工业生产过程控制时,也要求计算机能及时处理由各类传感器送来的数据,然后控制相应的执行机构。

(2)实时信息处理系统。当用于预定飞机票、查询有关航班、航线、票价等事宜时,或当用于银行系统、情报检索系统时,都要求计算机能对终端设备发来的服务请求及时予以正确的回答。此类对响应及时性的要求稍弱于第一类。

实时操作系统的主要特点

(1)及时响应。每一个信息接收、分析处理和发送的过程必须在严格的时间限制内完成。

(2)高可靠性。需采取冗余措施,双机系统前后台工作,也包括必要的保密措施等。

6. 分时系统和实时系统的比较

分时系统:现在流行的PC,服务器都是采用这种运行模式,即把CPU的运行分成若干时间片分别处理不同的运算请求,比如,linux系统
实时系统:一般用于单片机上、PLC等,比如电梯的上下控制中,对于按键等动作要求进行实时处理

7. 操作系统的作用

现代的计算机系统主要是由一个或者多个处理器,主存,硬盘,键盘,鼠标,显示器,打印机,网络接口及其他输入输出设备组成。

一般而言,现代计算机系统是一个复杂的系统。

其一:如果每位应用程序员都必须掌握该系统所有的细节,那就不可能再编写代码了(严重影响了程序员的开发效率:全部掌握这些细节可能需要一万年……)

其二:并且管理这些部件并加以优化使用,是一件极富挑战性的工作,于是,计算安装了一层软件(系统软件),称为操作系统。它的任务就是为用户程序提供一个更好、更简单、更清晰的计算机模型,并管理刚才提到的所有设备。

8. 总结

程序员无法把所有的硬件操作细节都了解到,管理这些硬件并且加以优化使用是非常繁琐的工作,这个繁琐的工作就是操作系统来干的,有了他,程序员就从这些繁琐的工作中解脱了出来,只需要考虑自己的应用软件的编写就可以了,应用软件直接使用操作系统提供的功能来间接使用硬件。

精简的说的话,操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序。

细说的话,操作系统应该分成两部分功能:

一:隐藏了丑陋的硬件调用接口,为应用程序员提供调用硬件资源的更好,更简单,更清晰的模型(系统调用接口)。应用程序员有了这些接口后,就不用再考虑操作硬件的细节,专心开发自己的应用程序即可。
例如:操作系统提供了文件这个抽象概念,对文件的操作就是对磁盘的操作,有了文件我们无需再去考虑关于磁盘的读写控制(比如控制磁盘转动,移动磁头读写数据等细节)

二:将应用程序对硬件资源的竞态请求变得有序化
例如:很多应用软件其实是共享一套计算机硬件,比方说有可能有三个应用程序同时需要申请打印机来输出内容,那么a程序竞争到了打印机资源就打印,然后可能是b竞争到打印机资源,也可能是c,这就导致了无序,打印机可能打印一段a的内容然后又去打印c……,操作系统的一个功能就是将这种无序变得有序。

二、基础名词概念

1. 进程

进行中的程序就是一个进程

进程占用资源,需要操作系统调度

pid:能够唯一标识一个进程

进程是计算机中最小的资源分配单位:每个程序在运行起来的时候需要分配一些内存

2. 线程

线程是进程中的一个单位,不能脱离进程单独存在

线程是计算机中能够被CPU调度的最小单位:实际执行具体编译解释后的代码的是线程,所以CPU执行的是解释之后的线程中的代码。

3. 并发

多个程序同时执行:只有一个CPU,多个程序轮流在一个CPU上执行

宏观上:多个程序在同时执行

微观上:多个程序轮流在一个CPU上执行,本质上还是串行

4. 并行

多个程序同时执行,并且同时在多个CPU上执行

并发是多个任务交替使用CPU,同一时刻只有一个任务在跑;并行是多个任务同时跑。

5. 同步

所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

6. 异步

所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。异步比同步效率高。

举例

比如我去银行办理业务,可能会有两种方式:
第一种 :选择排队等候;
第二种 :选择取一个小纸条上面有我的号码,等到排到我这一号时由柜台的人通知我轮到我去办理业务了;

第一种:前者(排队等候)就是同步等待消息通知,也就是我要一直在等待银行办理业务情况;

第二种:后者(等待别人通知)就是异步等待消息通知。在异步消息处理中,等待消息通知者(在这个例子中就是等待办理业务的人)往往注册一个回调机制,在所等待的事件被触发时由触发机制(在这里是柜台的人)通过某种机制(在这里是写在小纸条上的号码,喊号)找到等待该事件的人。

7. 阻塞

CPU不在工作。input accept recv recvfrom sleep connect

8. 非阻塞

CPU在工作。

9. 同步阻塞

input sleep recv recvfrom

效率最低。拿上面的例子来说,就是你专心排队,什么别的事都不做。

10. 同步非阻塞

ret = eval('1 + 2 + 3')
def func(*args):
    count = 0
    # inp = input('输入一个数字')  # 有这一行代码就是同步阻塞,没有就是同步非阻塞
    for i in args:
        count += 1
    return count
a = 1
b = 2
c = a + b
d = func(a, b, c)
print(d)

实际上是效率低下的。

想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

11. 异步阻塞

如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行做其它的事情,那么很显然,这个人被阻塞在了这个等待的操作上面;

异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

12. 异步非阻塞

效率更高。

因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步 + 非阻塞的方式了。

三、multiprocessing模块

multiple 多元化的,processing 进程

multiprocessing 多元的处理进程的模块

1. Process类初使用

from multiprocessing import Process
import os


def func():
    print(os.getpid(), os.getppid())  # 11624 3228
    # pid process id 进程id
    # ppid parent process id 父进程id


print(123)

if __name__ == '__main__':
    print('main:', os.getpid(), os.getppid())  # main: 3228 8724
    p = Process(target=func)
    p.start()

123
main: 3228 8724
123
11624 3228

2. 传递参数

通过 args 进行参数传递,类型必须是元组

from multiprocessing import Process


def func(name, age):
    print(name, age)


if __name__ == '__main__':
    p = Process(target=func, args=('lucy', 18))
    p.start()

3. 返回值问题

主进程不能获取子进程的返回值,因为进程启动之后,内存之间相互独立。

4. 同时开启多个子进程

from multiprocessing import Process
import time


def func(name, age):
    print(f'name start')
    time.sleep(1)
    print(name, age)


if __name__ == '__main__':
    arg_list = [('lucy', 18), ('jack', 20), ('tom', 22)]
    for arg in arg_list:
        p = Process(target=func, args=arg)
        p.start()  # 异步非阻塞

jack start
tom start
lucy start
jack 20
tom 22
lucy 18

5. join的用法

5.1 low版
from multiprocessing import Process
import time


def func(name, age):
    print(f'发送一封邮件给年龄是age岁的name')
    time.sleep(1)
    print('发送完毕')


if __name__ == '__main__':
    p = Process(target=func, args=('lucy', 18))
    p.start()  # 异步非阻塞
    p.join()  # 同步阻塞:直到p进程执行完毕才能继续执行下面的代码
    p = Process(target=func, args=('jack', 20))
    p.start()
    p.join()
    print('所有的邮件已发送')

发送一封邮件给年龄是18岁的lucy
发送完毕
发送一封邮件给年龄是20岁的jack
发送完毕
所有的邮件已发送
5.2 高级版
from multiprocessing import Process
import time


def func(name, age):
    print(f'发送一封邮件给年龄是age岁的name')
    time.sleep(1)
    print('发送完毕')


if __name__ == '__main__':
    arg_list = [('lucy', 18), ('jack', 20), ('tom', 22)]
    p_list = []
    for arg in arg_list:
        p = Process(target=func, args=arg)
        p.start()  # 异步非阻塞
        p_list.append(p)
    for p in p_list:
        p.join()  # 同步阻塞:直到p进程执行完毕才能继续执行下面的代码
    print('所有的邮件已发送')

发送一封邮件给年龄是18岁的lucy
发送一封邮件给年龄是20岁的jack
发送一封邮件给年龄是22岁的tom
发送完毕
发送完毕
发送完毕
所有的邮件已发送

6. 同步阻塞和异步非阻塞

同步阻塞:join, input

调用一个函数需要等待这个函数的执行结果,并且在执行这个函数的过程中CPU不工作。

异步非阻塞:start

调用一个函数不需要等待这个函数的执行结果,并且在执行这个函数的过程中CPU一直工作。

【补充】

同步非阻塞:ret = eval(‘1 + 2 + 3’)

调用一个函数需要等待这个函数的执行结果,在执行这个函数的过程中CPU一直工作。

异步阻塞:比如,开启10个进程(异步的),获取这个进程的返回值,并且能做到哪一个进程先结束,就先获取谁的返回值。

调用一个函数不需要等待这个函数的执行结果,并且在执行这个函数的过程中CPU不工作。

7. 多进程之间的数据是隔离的

from multiprocessing import Process

n = 0


def func():
    global n
    n += 1


if __name__ == '__main__':
    p_list = []
    for i in range(100):
        p = Process(target=func)
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print(n)  # 0

8. 多进程实现并发的socket的server端

8.1 server.py
import socket
from multiprocessing import Process


def talk(conn):
    while 1:
        msg = conn.recv(1024).decode('utf-8')
        ret = msg.upper().encode('utf-8')
        conn.send(ret)


if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1', 9001))
    sk.listen()
    while 1:
        conn, addr = sk.accept()
        Process(target=talk, args=(conn,)).start()

8.2 client.py
import time
import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 9001))

while 1:
    sk.send(b'hello')
    msg = sk.recv(1024).decode('utf-8')
    print(msg)
    time.sleep(0.5)

9. 进程的开启和关闭

父进程 开启了 子进程

父进程会等待所有的子进程结束,是为了回收子进程的资源

10. 开启进程的另一种方法(面向对象的方法)

面向对象的方法,通过继承和重写run方法完成了启动子进程;通过重写init和调用父类的init完成了给子进程传参的操作。

import os
import time
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self, name_, age):
        super().__init__()  # 执行父类的初始化方法
        self.name_ = name_
        self.age = age

    # 必须重写run方法
    def run(self):
        time.sleep(1)
        print(os.getppid(), os.getpid(), self.name_, self.age)


if __name__ == '__main__':
    print('main:', os.getpid())
    for i in range(10):
        p = MyProcess('lucy', 18)
        p.start()

11. Process类的其他属性和方法

import os
import time
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self, name_, age):
        super().__init__()  # 执行父类的初始化方法
        self.name_ = name_
        self.age = age

    # 必须重写run方法
    def run(self):
        time.sleep(1)
        print(os.getppid(), os.getpid(), self.name_, self.age)


if __name__ == '__main__':
    print('main:', os.getpid())
    p = MyProcess('lucy', 18)
    p.start()
    print(p.name)  # 进程的名字
    print(p.pid, p.ident)  # 进程id
    print(p.is_alive())  # 判断一个进程是否存活
    p.terminate()  # 强制结束一个子进程 异步非阻塞
    print(p.is_alive())
    time.sleep(0.01)  # 执行过快,未能即是刷新
    print(p.is_alive())

12. 守护进程

守护进程会等待主进程的代码执行结束之后立即结束,而不是等待整个主进程结束,因为主进程要回收子进程的资源。

p.daemon = True 设置p是一个守护进程,必须在p.start()之前设置。

from multiprocessing import Process
import time


def son1():
    while 1:
        print('--->in son1')
        time.sleep(1)


def son2():
    for _ in range(5):
        print('in son2')
        time.sleep(1)


if __name__ == '__main__':
    p1 = Process(target=son1)
    p1.daemon = True  # 表示设置p1是一个守护进程
    p1.start()
    p2 = Process(target=son2)
    p2.start()
    time.sleep(3)
    print('in main')

--->in son1
in son2
--->in son1
in son2
--->in son1
in son2
in main
in son2
in son2

主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程的执行进度无关。

要求:守护进程p1必须在p2进程执行结束之后才结束

from multiprocessing import Process
import time


def son1():
    while 1:
        print('--->in son1')
        time.sleep(1)


def son2():
    for _ in range(5):
        print('in son2')
        time.sleep(1)


if __name__ == '__main__':
    p1 = Process(target=son1)
    p1.daemon = True  # 表示设置p1是一个守护进程
    p1.start()
    p2 = Process(target=son2)
    p2.start()
    time.sleep(3)
    print('in main')
    p2.join()  # 等待屁p2结束之后才结束

# 等待p2结束--> 主进程的代码才结束--> 守护进程结束

四、进程同步(Lock锁)

锁:会降低程序的运行效率,但保证了数据的安全。

互斥锁

互斥锁,不能在同一个进程中连续acquire多次,必须是一个acquire对应一个release。

from multiprocessing import Lock, Process
import time


def func(i, lock):
    lock.acquire()  # 拿钥匙 加锁
    print(f'被锁起来的代码:i')
    time.sleep(1)
    lock.release()  # 还钥匙 释放锁


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=func, args=(i, lock)).start()

抢票案例

b_ticket.py
import json
import time
from multiprocessing import Process, Lock


def search(i):
    with open('ticket', encoding='utf-8') as f:
        ticket = json.load(f)
    print(f'i:当前的余票是ticket["count"]')


def buy_ticket(i):
    with open('ticket', encoding='utf-8') as f:
        ticket = json.load(f)
    if ticket['count'] > 0:
        ticket['count'] -= 1
        print(f'i号买到了票')
    time.sleep(0.2)
    with open('ticket', mode='w', encoding='utf-8') as f:
        json.dump(ticket, f)


def get_ticket(i, lock):
    search(i)
    # lock.acquire()  # 加锁
    # buy_ticket(i)
    # lock.release()  # 释放锁
    # with lock 代替acquire和release 并且在此基础上做一些异常处理
    # 保证了即使一个进程的代码出错退出了 也会归还钥匙
    with lock:
        buy_ticket(i)


if __name__ == '__main__':
    lock = Lock()  # 互斥锁
    for i in range(10):
        Process(target=get_ticket, args=(i, lock)).start()

ticket
"count": 1

五、进程间通信

进程之间通信(IPC)Inter Process communication

  • 基于文件

同一台机器上的多个进程之间的通信,是基于socket的文件级别的通信来完成数据传递的,目前可以利用封装好的Queue来实现。

  • 基于网络

同一台机器或者多台机器上的多个进程之间的通信,是通过第三方工具(消息中间件)来完成数据的传递,第三方工具有memcache、redis、rabbitmq、kafka等。

总结

queue队列是基于socket、pickle、Lock实现的,安全

pipe管道基于socket、pickle实现的,没有锁,数据不安全

第三方工具

队列

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

Queue([maxsize]) 
创建共享的进程队列。
参数: maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁实现。
Queue([maxsize]) 
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 

Queue的实例q具有以下方法:

q.get([block[,timeout]]) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait()

以上是关于Python并发编程之进程的玩法的主要内容,如果未能解决你的问题,请参考以下文章

Python并发编程之进程的玩法

Python并发编程之线程的玩法

Python并发编程之线程的玩法

Python并发编程之线程的玩法

python并发编程之多进程

Python并发编程之进程2