网络编程进阶及并发编程

Posted sxy-blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络编程进阶及并发编程相关的知识,希望对你有一定的参考价值。

并发编程之多进程

进程理论

进程

进程:正在运行的一个过程或一个任务。负责执行任务的是cpu。

程序与进程的区别:程序只是一堆代码,而进程指的是程序的运行过程。

注意同一个程序执行两次,是两个进程。比如打开两个QQ,登陆的是不同人的QQ号。

 

并行与并发

无论是并行还是并发,在用户看来都是‘同时‘运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,

cpu来做这些任务,而一个cpu同一时刻只能执行一个任务。

并发:并发是看起来像是一起执行,实际上是通过在不同人物之间快速切换,使任务看起来在同时进行。

并行:真正意义上的同时执行,需要具备多个cpu。

 

进程的创建

但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式。

而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关)

  2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

  3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)

  4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的

  1. 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、

  2. 同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)

  3. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。

关于创建的子进程,UNIX和windows

1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间,任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。

2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,将父进程地址空间的数据作为子进程的起始状态。

但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的,数据也是来自于父进程,但并不完全相同。

 

进程的终止

 

  1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出错退出(自愿,python a.py中a.py不存在)

  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)

  4. 被其他进程杀死(非自愿,如kill -9)

 

进程的状态

其实在两种情况下会导致一个进程在逻辑上不能运行,

  1. 进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作。

  2. 与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

因而一个进程由三种状态

技术图片

当一个处于运行态的进程占用时间过多就可能进入就绪态,过一段时间再进入运行态。

当程序在执行过程中遇到IO就会进入阻塞状态,IO完成后会先进入就绪状态,再进入运行态。

 

进程并发的实现

进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,从而保证进程再次运行时像从未中断过一样。

 

multiprocessing模块

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内

 

Process类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

 

参数:

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,)

kwargs表示调用对象的字典,kwargs={‘name‘:‘alex‘,‘age‘:18}

name为子进程的名称

 

创建进程的两种方式

注:开启多线程多进程一定要将主进程或主线程放在 if __name__ == ‘__main__‘: 下

from multiprocessing import Process
import time
# 方式一


def task(name):
    print(%s is running % name)
    time.sleep(3)
    print(work is done)


if __name__ == __main__:  # Windows系统一定要放在main下
    p = Process(target=task, args=(子进程1,))  # 创建了一个对象  以位置传参
    # p = Process(target=task, kwargs={‘name‘: ‘子进程1‘})  # 字典的形式传参
    p.start()  # 仅给操作系统发送了一个信号 告诉操作系统开启一个子进程 申请开辟一片内存空间
           # 将父进程地址空间的数据作为子进程的起始状态
               # 两个进程运行状态独立 不影响主进程的执行
    print("")


# 方式二 面向对象
class MyProcess(Process):
    def __init__(self, name):
        super().__init__()  # 继承父类原有方法
        self.name = name

    def run(self):  # 子进程的执行方法一定要叫run
        print("%s is running" % self.name)
        time.sleep(3)
        print(task is done)


if __name__ == __main__:
    p = MyProcess(子进程1)
    p.start()  # 会执行run的方法
    print()

 

进程的pid和ppid

每一个进程都有自己的pid,相当于自身的编号,通过这些编号我们可以区分不同的进程。

查看进程的pid可用os模块下的os.getpid()方法。

ppid是查看当前进程的父进程的id号,可用os.getppid()查看。

from multiprocessing import Process
import time, os


def task():
    print("%s id is running, parent id is %s" % (os.getpid(), os.getppid()))
    time.sleep(3)
    print(%s is done % (os.getpid()))  # os.getpid() 拿到当前进程的编号 os.getppid()拿到父进程的id


if __name__ == __main__:
    p = Process(target=task)
    p.start()
    print(, os.getpid(), os.getppid())  
    # 如果在Pycharm上运行py文件 当前主进程的父进程id就是Pycharm的id
   # windows 可在终端使用 tasklist | findstr pycharm 查看pycharm的pid
    # 如果在终端执行这个py文件,那么ppid就是终端的pid

 

僵尸进程与孤儿进程

当一个父进程开启了多个子进程 父进程想要查看子进程 但是子进程此时已经执行完毕 在内存中消失了 那么应该怎么去看?

当子进程死亡,系统会把子进程的一些状态信息保留,即保留子进程的尸体,以便父进程可以查看到自己所有子进程的状态,这种进程叫做僵尸进程。
太多的僵尸进程会占用系统的pid 如果父进程一直不死,就会一直占用这些pid 那么当系统开启新的进程时,就可能起不来。

当父进程先执行完成,子进程还在执行,就相当于没有了爹,这时候此进程就叫做孤儿进程,在linux下 所有进程的父亲是Init进程。
孤儿进程直接由init监管、回收,孤儿进程是无害的。

 

Process对象的join()方法

在主进程运行过程中如果想并发地执行其他的任务,我们可以开启子进程,此时主进程的任务与子进程的任务分两种情况。

情况一:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源。

情况二:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要有一种机制能够让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是join方法的作用。

from multiprocessing import Process
import time
import random
import os

def task():
    print(%s is piaoing %os.getpid())
    time.sleep(random.randrange(1,3))
    print(%s is piao end %os.getpid())

if __name__ == __main__:
    p=Process(target=task)
    p.start()
    p.join() #等待p停止,才执行下一行代码
    print()

那么有了join()程序是否就变成了串行执行?

from multiprocessing import Process
import time
import random

def task(name):
    print(%s is piaoing %name)
    time.sleep(random.randint(1,3))
    print(%s is piao end %name)

if __name__ == __main__:
    p1=Process(target=task,args=(egon,))
    p2=Process(target=task,args=(alex,))
    p3=Process(target=task,args=(yuanhao,))
    p4=Process(target=task,args=(wupeiqi,))

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    # 有的人有疑问: 既然join是等待进程结束, 那么我像下面这样写, 进程不就又变成串行的了吗?
    # 当然不是了, 必须明确:p.join()是让谁等?
    # 很明显p.join()是让主线程等待p的结束,卡住的是主进程而绝非子进程p
    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print()

p1,p2,p3,p4四个子进程都向操作系统发出了请求,执行的时候四个进程是并发执行,当四个进程都执行完毕,主进程才可以执行,join方法卡住的是主进程,

子进程仍然是并发的。

 

 

Process对象的其他属性和方法

p.is_alive(): 查看进程是否处于存活状态。

p.terminate(): 向操作系统发出请求关闭进程。

p.pid: 查看进程的pid。

p.name: 查看进程的名字,默认为Process-进程数。

from multiprocessing import Process
import time


def task(name, n):
    """进程要执行的任务"""
    print("%s is running" % name)
    time.sleep(n)
    print(%s is done % name)


if __name__ == __main__:
    p1 = Process(target=task, args=(子进程1, 1))
    p2 = Process(target=task, args=(子进程2, 3), name=子进程2)
    p3 = Process(target=task, args=(子进程3, 5))


    p_list = [p1, p2, p3]
    for p in p_list:
        p.start()  # 向操作系统发出开启子进程的请求

    # for p in p_list:
    #     p.join()  # 卡住主进程,等到所有的子进程执行完,再执行主进程,子进程之间是并发的,卡的时间取决于执行时间最久的子进程
    print(p1, p1.is_alive())  # True

    p1.join()

    print(p1, p1.is_alive())  # False  p.is_alive()判断子进程是否处于执行的状态
    print(p2, p2.is_alive())

    p2.terminate()  # 关闭进程,向操作系统发送请求 操作系统需要反应一段时间 不会立刻关闭,因此可能之后查看时,进程可能仍然存活

    time.sleep(1)  # 经过一小段时间,操作系统执行,进程被关闭
    print(p2, p2.is_alive())  # False
    print()

    print(p1.name, p1.pid)  # 查看子进程的pid可直接用p.pid方法
    # p.name 查看进程的名字 默认就是开启的子进程数Process-1  也可以规定
    print(p2.name, p2.pid)  # 由此也可验证僵尸进程 子进程执行完毕 保留状态信息

 

进程之间的空间是隔离的

from multiprocessing import Process
def task():
    global n
    n = 0
    print("子进程:%s" % n)  # n=0


if __name__ == __main__:
    n = 100
    p = Process(target=task)
    p.start()
    print("主进程:%s" % n)  #n = 100

上面的程序在主进程中定义了一个n=100,开启了一个子进程声明全局变量对n进行修改,在子进程中n=0,但是在主进程中n=100,

这就说明了主进程与子进程之间的空间是隔离的,数据不共享。

 

基于多进程的套接字通信

可以接收多个客户端连接,实现与多个客户端通信。

将连接和通信的活分开,就相当于前台,每来一个客户,就安排一个服务员单独服务。

# server
from multiprocessing import Process
from socket import *

def task(conn):
    while True:
        try:
            data = conn.recv(1024).decode(utf-8)
            if not data:
                break
            conn.send(data.upper().encode(utf-8))
        except ConnectionResetError:
            break


def connect(host, port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((host, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        p = Process(target=task, args=(conn,))
        p.start()


    server.close()

if __name__ == __main__:
    connect(127.0.0.1, 8080)


# client
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect((127.0.0.1, 8080))
while True:
    msg = input(>>>)
    client.send(msg.encode(utf-8))
    data = client.recv(1024).decode(utf-8)
    print(data)

 

守护进程

主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比崇祯皇帝身边的老太监,崇祯皇帝已死老太监就跟着殉葬了。

关于守护进程需要强调两点:

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

如果守护进程有子进程,那么主进程结束,守护进程也会结束,那么守护进程的子进程就成为了孤儿进程,孤儿进程统一有init进程回收,如果孤儿进程有很多,会降低效率。

如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止。

将子进程设置为守护进程的语法,p.daemon = True

from multiprocessing import Process

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == __main__:
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True  # 设置为守护进程 主进程代码运行结束 子进程也结束
    p1.start()
    p2.start()
    print("main-------")

# 守护进程不可以开启子进程
# 此程序当打印出main时p1就会结束执行 不可能看到end123

 

互斥锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如下

#并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process
import os,time
def work():
    print(%s is running %os.getpid())
    time.sleep(2)
    print(%s is done %os.getpid())

if __name__ == __main__:
    for i in range(3):
        p=Process(target=work)
        p.start()

如果我想要实现的效果是 xxx is running  xxx is done,那么这种方式一定是无法完成的,因为在time.sleep的过程中,cpu会把执行权限交给别的进程不会一直等待。

那如何实现这种效果?那就是加锁处理。

从multiprocess模块中导入Lock类,实例化,然后加锁。

加锁的语法,实例化一把锁之后,在程序开头mutex.acquire(),释放锁时 mutex.release()。

from multiprocessing import Process,Lock
import time
def task(name,mutex):
    mutex.acquire()  # 得到锁
    print(%s-1 is running % name)
    time.sleep(1)
    print(%s-2 is running % name)
    time.sleep(1)
    print(%s-3 is running % name)
    time.sleep(1)
    mutex.release()  # 释放锁

if __name__ == __main__:
    mutex = Lock()  # 互斥锁 要保证传入的是同一把锁 互斥锁就是把并行变为串行
    for i in range(1, 4):
        p = Process(target=task, args=((进程%s % i),mutex))
        p.start()

互斥锁的原理就是把并行变为串行,虽然降低了效率,但是提高了数据的安全性。

 

互斥锁和join()的区别

使用join可以将并发变成串行,互斥锁的原理也是将并发变成穿行,那我们直接使用join就可以了啊,为何还要互斥锁?

我们来看一个模拟抢票的程序。

from multiprocessing import Process,Lock
import json
import time


def search(name):
    time.sleep(1)
    data = json.load(open(db.json, r, encoding=utf-8))
    print("%s查看剩余票数<%s>" % (name, data[count]))


def get(name, mutex):
    mutex.acquire()
    data = json.load(open(db.json, r, encoding=utf-8))
    time.sleep(3)
    if int(data[count]) > 0:
        print(乘客%s购票成功 % name)
        dic = {count: int(data[count]) - 1}
        json.dump(dic, open(db.json, w, encoding=utf-8))  # db.json是票的数据库
    mutex.release()


def task(name, mutex):
    search(name)
    get(name, mutex)


if __name__ == __main__:
    mutex = Lock()
    for i in range(10):
        p = Process(target=task, args=((路人%s % i), mutex))
        p.start()
   # p.join()

 

如果我们不加锁,在启动进程时直接p.join(),那么乘客在查票时也就只能一个一个去查,这显然不符合实际。我们需要保证的只是在买票时,一个人只能买到一张票。

join()将整个程序都变成了串行执行,但是我们需要的只是在买票时串行就可以了,这时锁的优势就体现出来了。

进程与进程之间 内存空间是隔离的,但可以共享硬盘上的空间(文件)。

我们可以只在需要的地方即修改共享数据的地方加锁,而其他不需要的地方我们可以不作修改。

总结

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

1、效率低(共享数据基于文件,而文件是硬盘上的数据),从硬盘上读取文件效率很低。

2、需要自己加锁处理,需要注意在适当的地方加锁解锁,否则可能会形成死锁(之后会提到),使整个程序卡住。

因此我们最好找寻一种解决方案能够兼顾:

1、效率高(多个进程共享一块内存的数据)

2、帮我们处理好锁问题

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择

我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

 

队列

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。

 

创建队列的类:

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

maxsize是队列中允许最大项数,省略则无大小限制,可以是任意的数据类型
但需要明确:
1、队列内存放的是消息而非大数据
2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

 

主要方法:

q.put方法用以插入数据到队列中。
q.get方法可以从队列读取并且删除一个元素。

 

from multiprocessing import Process,Queue
q = Queue(3)  # 实例化产生一个队列 内部参数是最大的数据数 可以是任意数据类型
q.put(1)  # 放入数据
q.put({a: 2})
q.put([123])
# q.put() 此时已取完 会卡住等待有人取走数据

print(q.full())  # 判断队列是否已满

print(q.get())  # 取出数据
print(q.get())
print(q.get())

print(q.empty())  # 判断队列是否为空

#  q.get() 此时会卡住 等待放入数据
# 队列用于进程间的通信
# 进程间的通信有两种方式 一种是管道 一种是队列
# 队列就是在管道的基础上加上锁 因此用队列比较好

# 队列内存放的数据不应该过大 队列占用的是内存空间 因此速度快 但最大的数据数也受制于内存空间

 

生产者消费者模型

 

为什么要用生产者消费者模型?

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,

那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

为了解决这个问题于是引入了生产者和消费者模式。

 

什么是生产者消费者模型?

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,

所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,

阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

 

下面用吃包子这个例子来解释

from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print(33[43m%s 吃 %s33[0m %(name,res))

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res=%s%s %(food,i)
        q.put(res)
        print(%s 生产了 %s %(name,res))

if __name__ == __main__:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,egon,包子))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,alex))

    #开始
    p1.start()
    c1.start()
    print()

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print(33[43m%s 吃 %s33[0m %(name,res))

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res=%s%s %(food,i)
        q.put(res)
        print(%s 生产了 %s %(name,res))

if __name__ == __main__:
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,egon,包子))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,alex))

    #开始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) # 结束信号
    print()

但是当有多个生产者和有多个消费者时,这种方法就非常low,因为每一个生产者生产完毕都要加一个结束信号,而且也有可能形成错乱。

我们的思路其实就是提供一个结束信号而已,有一种队列提供了这种机制。

 

JoinableQueue

这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。

 

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。‘

如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

 

q.join():生产者调用此方法进行阻塞,直到队列中所有的数据均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

 

from multiprocessing import Process, JoinableQueue
import time


def make(name, q):
    for i in range(5):
        time.sleep(1)
        res = 包子%s % i
        print(生产者%s做好了%s % (name, res))
        q.put(res)
    q.join()  # 队列中的数据被取完 结束


def consumer(name, q):
    while True:
        a = q.get()
        time.sleep(0.5)
        print(消费者%s吃了包子%s % (name, a))
        q.task_done()  # 封装的方法 告诉q队列中的数据处理了一个


if __name__ == __main__:
    q = JoinableQueue()
    c1 = Process(target=make, args=(生产者1, q))
    c2 = Process(target=make, args=(生产者2, q))
    c3 = Process(target=make, args=(生产者3, q))

    p1 = Process(target=consumer, args=(消费者1, q))
    p2 = Process(target=consumer, args=(消费者2, q))
    p1.daemon = True
    p2.daemon = True  # 当数据处理完 消费者就没有必要存在了 因此设为守护进程

    c1.start()
    c2.start()
    c3.start()
    p1.start()
    p2.start()

    c1.join()
    c2.join()
    c3.join()  # 当c1 c1 c3进程结束 也表示队列中的数据被处理完了
    print()

 

生产者消费者模型总结

1、程序中有两类角色

一类负责生产数据(生产者)
一类负责处理数据(消费者)


2、引入生产者消费者模型为了解决的问题是

平衡生产者与消费者之间的速度差,使程序解开耦合,


3、如何实现生产者消费者模型

生产者<--->队列<--->消费者

 

并发编程之多线程

线程

在传统操作系统中,每个进程有一个地址空间,而且默认就有一个线程。

线程顾名思义,就是一条流水线工作的过程是一个执行单位

而一条流水线必须属于一个车间,一个车间的工作过程是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一条流水线。

所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。

 

进程与线程的区别

1.一个进程默认有一个线程

2.同一个进程内的多个线程共享该进程内的地址资源

3.创建线程的开销要远小于创建进程的开销(创建进程相当于开设一个部门,创建线程相当于在部门里找人,因此,创建线程的开销远小于创建进程的开销)

创建进程时,向操作系统发出请求,申请开辟一片内存空间,一段时间后(很短的时间)操作系统响应。

创建线程时,向操作系统发出请求,几乎同时线程就创建完毕。

 

创建线程的两种方式

multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍。

from threading import Thread
import time

# 方式1
def task(name):
    time.sleep(2)
    print(%s say hello  % name)


if __name__ == __main__:
    t = Thread(target=task, args=(线程,))
    t.start()
    print()


# 方式二
class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(%s is say hello % self.name)


if __name__ == __main__:
    t = MyThread(线程)
    t.start()
    print()  # 在资源角度 是主进程  在执行角度 是主线程

 

多进程与多线程的区别

一 开启速度

1、在主进程下开启线程

from threading import Thread

def work():
    print(hello)

if __name__ == __main__:
    t=Thread(target=work)
    t.start()
    print(主线程/主进程)

# 结果:
 hello
主线程/主进程

执行结果如上,几乎是t.start ()的同时就将线程开启了,然后先打印出了hello,证明线程的创建开销极小。

2、在主进程下开启子进程

from multiprocessing import Process

def work():
    print(hello)

if __name__ == __main__:
    #在主进程下开启子进程
    p=Process(target=work)
    p.start()
    print(主线程/主进程)

执行结果如下,p.start ()将开启进程的信号发给操作系统后,操作系统要申请内存空间,让好拷贝父进程地址空间到子进程,开销远大于线程。

二 pid

 

 1、在主进程下开启多个线程,每个线程都跟主进程的pid一样

from threading import Thread
import os

def work():
    print(hello,os.getpid())

if __name__ == __main__:
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print(主线程/主进程pid,os.getpid())

# 执行结果
#hello 7939
#hello 7939
#主线程/主进程 7939

2、开多个进程,每个进程都有不同的pid

from multiprocessing import Process
import os

def work():
    print(hello,os.getpid())

if __name__ == __main__:
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print(主线程/主进程,os.getpid())

# 执行结果
#主线程/主进程 7951
#hello 7952
#hello 7953

三 同一进程内的多线程间共享该进程的数据

from multiprocessing import Process
import os

def work():
    global n
    n=0

if __name__ == __main__:
    n=100
    p=Process(target=work)
    p.start()
    p.join()
    print(,n)

毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100。

from threading import Thread
import os

def work():
    global n
    n=0

if __name__ == __main__:
    n=100
    t=Thread(target=work)
    t.start()
    t.join()
    print(,n)

查看结果为0,因为同一进程内的线程之间共享进程内的数据。

 

Thread对象的其他属性和方法

Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。

threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。即下面代码的t
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

from threading import Thread, currentThread, active_count,enumerate


def work():
    print(currentThread().getName())  # 获得当前线程名
    # currentThead 相当于 t


if __name__ == __main__:
    t =Thread(target=work, name=子线程)
    t.start()

    t.setName(儿子线程)  # 设置线程名
    # print(t.is_alive())

    print(t.getName())

    t.join()  # 等待子线程执行完

    print(t.is_alive())  # 查看当前线程是否存活

    print(currentThread().getName())

    print(active_count())  # 正在运行的线程数量
    print(enumerate())  # 获取当前运行的线程对象的列表

 

多线程下的套接字通信

# server
from socket import *
from concurrent.futures import ThreadPoolExecutor


def server(ip, port):
    ser = socket(AF_INET, SOCK_STREAM)
    ser.bind((ip, port))
    ser.listen(3)
    while True:
        conn, addr = ser.accept()
        pool.submit(communicate, conn)


def communicate(conn):
    while True:
        try:
            data = conn.recv(1024).decode(utf-8)
            if not data:
                break
            conn.send(data.upper().encode(utf-8))
        except ConnectionResetError:
            conn.close()
            break


if __name__ == __main__:
    pool = ThreadPoolExecutor(2)
    server(127.0.0.1, 8090)




# client

from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect((127.0.0.1, 8090))
while True:
    msg = input(>>: )
    client.send(msg.encode(utf-8))
    data = client.recv(1024).decode(utf-8)
    print(data)

 

 

 

守护线程

无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁。

需要强调的是:运行完毕并非终止运行

1、对主进程来说,运行完毕指的是主进程代码运行完毕

2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

详细解释:

1、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。

2、主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

从执行角度来说 主线程的执行时间就是该进程的生命周期。

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(3)
    print("end123")  # 执行前主线程已执行完毕

def bar():
    print(456)
    time.sleep(1)
    print("end456")  # 主线程代码执行完毕后 等待非守护的子线程执行 然后该进程结束

if __name__ == __main__:
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")

运行结果:

123  # 线程开启的速度很快

456   

main-------

end456

end123不会出现,因为t1是守护线程,在t1 sleep的过程中主线程已经执行完毕。

 

互斥锁

一个进程下的多个线程本身就是共享数据的,共享数据意味着竞争,竞争就会带来混乱。

from threading import Thread, Lock
import time
n = 100

def task():
    global n
    temp = n
    time.sleep(0.1)
    n = temp-1


if __name__ == __main__:
    t_l = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_l.append(t)

    for t in t_l:
        t.join()
    print(主进程, n)  # n= 99 在time.sleep(0.1)时 剩下的所有进程都已执行

我们想要实现的操作是开启100个线程,每一个线程实现减1操作,但是因为线程开启的速度很快,

在time.sleep(0.1)的过程中,所有的线程都已经开启,所以效果就是100个线程都拿到100减1,最后n=99。

 

接下来我们加上锁

from threading import Thread, Lock
import time
n = 100


def task():
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)
    n = temp-1
    mutex.release()

if __name__ == __main__:
    mutex = Lock()
    t_l = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_l.append(t)

    for t in t_l:
        t.join()
    print(主进程, n)

线程中的互斥锁和进程中的互斥锁原理相同,都是将并行转换为串行,虽然降低了效率,但是提高了数据的安全性。

但是要注意,修改不同的数据需要加不同的锁。

 

GIL全局解释器锁

引子

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,

但是可以用不同的编译器来编译成可执行代码。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。

像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。

所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

 

GIL介绍

 

GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。

 

可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。

 

 

要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。

每执行一个python文件 都会产生一个独立的进程,首先会向操作系统申请一片内存空间,然后加载python解释器的代码,

将所写的python代码作为参数,传递给Python解释器,然后执行。

所有线程的任务,都是将线程的代码,作为参数传递给解释器,然后执行,因此线程想要执行自己的任务就要先访问到解释器的代码

 

在一个进程, 除了python的主线程以及开启的子线程,python还有自动回收机制,即有一个垃圾回收的线程,垃圾回收线程是解释器级别的线程。

垃圾回收线程和开启的线程在同一个进程中,因此数据是共享的,那么当垃圾回收线程回收一个数据,而这个数据恰好被使用时就会产生矛盾。

技术图片

 

解决这种矛盾的办法就是加锁处理,GIL锁的作用就是使一个进程中的多个线程在同一时间只有一个线程可以执行

 

Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?

首先,我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

然后,我们可以得出结论:保护不同的数据就应该加不同的锁。

最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),

后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock,如下图

技术图片

 

1、100个线程去抢GIL锁,即抢执行权限
2、肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
3、极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
4、直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程。

 

总结一下就是以下几句话:

GIL锁就相当于程序的执行权限

GIL锁是解释器级别的锁,保证了垃圾回收线程数据的安全。

锁的目的是保护共享数据,同一时间只能有一个线程来修改共享数据。

不同的数据需要加不同的锁。

GIL保护的是垃圾回收有关的数据,自己的数据要另外加锁,因此需要Lock。

 

 

GIL与多线程

进程可以利用多核,但是开销大,而python的多线程开销小,但GIL锁的出现使多线程不能利用多核优势。

我们明确以下几点:

1、cpu到底是用来做计算的,还是用来做I/O的?

2、多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能

3、每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处

 

因此对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用。

当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,

所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地。

 

对于计算密集型的任务 可以用多进程 利用多核优势,提升计算效率。

对于I/O密集型的任务,再多的核用处也不大,而且创建进程的开销很大,因此应该考虑多线程,多线程在多个任务间来回切换,实现并发。

总结

多线程用于IO密集型,如socket,爬虫,web

多进程用于计算密集型,如金融分析

 

死锁与递归锁

死锁现象

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。

此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

 

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print(%s 拿到A锁 %self.name)

        mutexB.acquire()
        print(\\%s 拿到B锁 %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print(%s 拿到B锁 %self.name)
        time.sleep(2)

        mutexA.acquire()
        print(%s 拿到A锁 %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == __main__:
    for i in range(10):
        t=MyThread()
        t.start()

 

第一个线程执行完func1后,执行func2时,拿到了B锁。此时线程2,执行func1,拿到了A锁,当线程二往下执行想要拿B锁时,发现B锁被线程1拿了;

同样线程1想拿A锁时发现A锁被线程2拿了,所以程序就卡在了这里,形成了死锁。

因此以后在写程序时,一定要注意在适当的时候加锁、释放锁,否则很容易造成程序卡住不动。

 

递归锁

解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter计数器变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。

直到一个线程所有的acquire都被release,其他的线程才能获得资源。

上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次。

 

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print(%s 拿到A锁 %self.name)

        mutexB.acquire()
        print(拿到B锁 %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print(拿到B锁 %self.name)
        time.sleep(2)

        mutexA.acquire()
        print(%s 拿到A锁 %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == __main__:
    for i in range(10):
        t=MyThread()
        t.start()

 

信号量、Event、定时器

信号量

信号量也是一把锁,和互斥锁的区别是互斥锁同一时间只能由一个任务执行,信号量可以指定多个。

from threading import Thread, Semaphore
import threading
import time, random
sm = Semaphore(3)  # 同意时间可以执行三个任务


def task():
    with sm:  # 简写 相当于sm.acquire()  sm.release()
        print(%s is in  % threading.current_thread().getName())
        time.sleep(random.randint(1, 3))


if __name__ == __main__:
    for i in range(10):
        t = Thread(target=task)
        t.start()

 

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

 

Event事件

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。

为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。

在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。

一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

from threading import Event

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

技术图片

 

import threading

# 事件用于线程之间的通信
def connect():
    n = 0
    while not event.is_set():
        if n == 3:
            print(try too many times)
            return
        print(%s is try % threading.currentThread().getName())
        event.wait(0.5)  # 线程将会阻塞 也可以设置超时时间 超过这个时间线程进行下一步操作
        n += 1
    print( 连接成功 )


def check():
    print(%s is checking % threading.currentThread().getName())
    time.sleep(1)
    event.set()  # 发送信号 使阻塞的线程继续运行

 

定时器

定时器,指定n秒后执行某操作,每开启一个定时器就相当于开启了一个线程。

我们来写一个生成验证码,输入验证的小程序。

from threading import Timer
import random


class Code:
    def __init__(self):
        self.make_cache()  

    def make_code(self, n=4):
        res = ‘‘
        for i in range(n):
            s1 = str(random.randint(0, 9))  # 随机产生0-9的数字
            s2 = chr(random.randint(65, 90)) # 随机产生65-90的数字并转换成ascii码对应的英文字符
            res += random.choice([s1, s2])  # 随机产生一位产生验证码
        return res

    def make_cache(self, interval=5):
        """生成验证码放入缓存"""
        self.cache = self.make_code()
        print(self.cache)
        self.t = Timer(interval, self.make_cache)  # Timer参数 (间隔时间,执行代码)
        self.t.start()

    def check(self):
        while True:
            msg = input(>>>: )
            if msg.upper() == self.cache:
                print(密码输入正确)
                break
            else:
                print(密码错误)

c = Code()  # 实例化对象
c.check()

 

线程queue

queue有三种不同的用法

class queue.Queue(maxsize=0) #队列:先进先出

class queue.LifoQueue(maxsize=0) #堆栈:last in fisrt out

class queue.PriorityQueue(maxsize=0) #优先级队列:存储数据时可设置优先级的队列

# 进程中的queue是为实现进程间的通信并解决了加锁的问题
# 线程中的数据是共享的 但是仍要处理加锁问题 queue提供了很多功能
import queue
# 队列 先进先出
q = queue.Queue(3)  # 设置最大数据量
q.put(first)
q.put(2)
q.put(two)
# q.put(1)  # 队列已满 阻塞
# q.put(1, block=True, timeout=3)  # block参数控制是否阻塞 ;当block为False时,不阻塞,直接报错;也可以设置时间,超过指定时间报错。
# q.put_nowait() 这种写法就是如果队列满了不阻塞,直接报错


print(q.get())
print(q.get())
print(q.get())
# q.get()  # 队列为空阻塞
# q.get_nowait()

q1 = queue.LifoQueue()  # 堆栈 后进先出 用法与上面相同

q2 = queue.PriorityQueue(3) # 优先级队列 优先级高的会先出 q2.put((10, first)) q2.put((20, 2)) # 以元组的形式放入 第一个数字代表优先级 数字越小优先级越高 第二个是放入的数据 q2.put((5, two)) print(q2.get()) print(q2.get()) print(q2.get())

 

进程池与线程池

 在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:

服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,

于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,

例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制。

 

介绍

官网:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

基本方法

1、submit(fn, *args, **kwargs)
     异步提交任务

2、map(func, *iterables, timeout=None, chunksize=1) 
  取代for循环submit的操作

3、shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

4、result(timeout=None)
取得结果

5、add_done_callback(fn)
回调函数  在任务完成后会回调这个函数

 

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor  # 进程池,线程池
import os, time, random
def task():
    print("%s is running" % os.getpid())
    time.sleep(random.randint(1, 3))


if __name__ == __main__:
    # pool = ProcessPoolExecutor(4) 进程池
    pool = ThreadPoolExecutor(4)  #  线程池 指定任务数 因为一台计算机上不可能无限制开线程或者进程 需要控制在安全的数量
    for i in range(10):
        pool.submit(task)  # submit(代码段,参数) 异步提交任务 将任务提交给池就不再管了 继续往下执行

    pool.shutdown(wait=True)  # 相当于将进程池/线程池关闭 不允许再添加任务 再join()
    # 默认wait=True 等池内所有任务执行完毕回收资源后才继续执行
    print()

 

map方法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print(%s is runing %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == __main__:

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit

 

 

 提交任务的两种方式

同步调用: 提交任务后原地等待任务执行完毕,拿到结果,再执行下一步操作,导致程序串行执行。

from concurrent.futures import ThreadPoolExecutor
import time
import random


def la(name):
    print(%s is laing % name)
    time.sleep(random.randint(1, 3))
    res = random.randint(8, 13)*#
    return {name: name, res: res}


def weigh(result):
    weight = len(result[res])
    print(%s ----%s kg% (result[name], weight))


if __name__ == __main__:
    pool = ThreadPoolExecutor(3)
    s = pool.submit(la, (alex,)).result()  # 拿到返回结果
    weigh(s)
    s = pool.submit(la, (egon,)).result()
    weigh(s)
    s = pool.submit(la, (yuanhao,)).result()
    weigh(s)

 

同步调用的等待与阻塞不同,阻塞是只程序遇到I/O,被剥夺cpu的执行权限,程序等待。

在遇到I/O会产生阻塞,但如果是纯计算任务就不会产生阻塞,但同步调用仍然会等待计算结果。

同步调用无论是计算密集型还是I/O密集型都会等待结果。

 

异步调用: 提交任务后,不原地等待任务执行完毕。

from concurrent.futures import ThreadPoolExecutor
import time
import random


def la(name):
    print(%s is laing % name)
    time.sleep(random.randint(1, 3))
    res = random.randint(8, 13)*#
    return {name: name, res: res}


def weigh(result):
    result = result.result()
    name = result[name]
    weight = len(result[res])
    print(%s ----%s kg % (name, weight))


if __name__ == __main__:
    pool = ThreadPoolExecutor(13)
    pool.submit(la, (alex,)).add_done_callback(weigh)  # 回调函数 任务完成后调用这个函数
    # 加入回调机制可以和上面的原理其实相同 但实现了程序的解耦合

    pool.submit(la, (egon,)).add_done_callback(weigh)

    pool.submit(la, (yuanhao,)).add_done_callback(weigh)

 

并发变成之协程

 

本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态

 

cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,

另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它

技术图片

在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态。

一:其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。

为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下 :

1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级

2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换

 

 

#串行执行
import time
def consumer(res):
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    pass

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    res=[]
    for i in range(10000000):
        res.append(i)
    return res

start=time.time()
#串行执行
res=producer()
consumer(res) #写成consumer(producer())会降低执行效率
stop=time.time()
print(stop-start) #1.5536692142486572



#基于yield并发执行
import time
def consumer():
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    while True:
        x=yield

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()

stop=time.time()
print(stop-start) #2.0272178649902344

二:第一种情况的切换。在任务一遇到io情况下,切到任务二去执行,这样就可以利用任务一阻塞的时间完成任务二的计算,效率的提升就在于此。

yield并不能实现遇到io切换

import time
def consumer():
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    while True:
        x=yield

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
        time.sleep(2)

start=time.time()
producer() #并发执行,但是任务producer遇到io就会阻塞住,并不会切到该线程内的其他任务去执行

stop=time.time()
print(stop-start)

对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)

控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,

即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,

让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。

 

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。

为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。

2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

 

协程介绍

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。

一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)

2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(非io操作的切换与效率无关

 

对比操作系统控制线程的切换,用户在单线程内控制协程的切换

优点如下:

1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

2. 单线程内就可以实现并发的效果,最大限度地利用cpu


缺点如下:

1. 协程的本质是单线程下,无法利用多核。可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程。

2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

 

协程特点总结

必须在只有一个单线程里实现并发

修改共享数据不需加锁(本质上是一个线程)

附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制)

 

greenlet模块

如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),

而使用greenlet模块可以非常简单地实现这20个任务直接的切换

from greenlet import greenlet
import time


def eat(name):
    print(%s is eat1 % name)
    # time.sleep(10)  # 无法实现监测IO的功能
    g2.switch(egon)
    print(%s is eat2 % name)
    g2.switch()


def play(name):
    print(%s is play1 % name)
    g1.switch()
    print(%s is play2 % name)


g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch(egon)  # 第一次要传参

greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。

单线程里的多个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。

 

gevent模块

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束

g2.join() #等待g2结束

#或者上述两步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

 

gevent遇到io时会自动切换任务

import gevent
def eat(name):
    print(%s eat 1 %name)
    gevent.sleep(2)
    print(%s eat 2 %name)

def play(name):
    print(%s play 1 %name)
    gevent.sleep(1)
    print(%s play 2 %name)


g1=gevent.spawn(eat,egon)
g2=gevent.spawn(play,name=egon)
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print()

上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

 

import gevent
from gevent import monkey
monkey.patch_all()  # 将所有的IO操作都标记 使gevent 能够识别
import time


def eat(name):
    print(%s is eat1 % name)
    time.sleep(3)
    # gevent.sleep(3)
    print(%s is eat2 % name)


def play(name):
    print(%s is play1 % name)
    time.sleep(4)  # gevent原本无法识别time.sleep
    # gevent.sleep(4)
    print(%s is play2 % name)


g1 = gevent.spawn(eat, egon)
g2 = gevent.spawn(play, alex)  # 异步提交任务 提交后不管任务是否执行


# g1.join()  # 需要等待任务执行完毕后 再结束线程  否则任务还没起来 主线程的代码就运行完毕 主线程就结束了
# g2.join()

gevent.joinall([g1, g2])  # 和分别join 一样的效果

 

 

用gevent实现并发的套接字通信

#server
import gevent
from gevent import monkey; monkey.patch_all()
# 一定要放在socket模块以前 不然无法识别socket的阻塞
from socket import *

def server(ip, port):
    serve = socket(AF_INET, SOCK_STREAM)
    serve.bind((ip, port))
    serve.listen(5)
    while True:
        conn, addr = serve.accept()
        gevent.spawn(communicate, conn)  # 运行后线程不会结束因此可以不用join


def communicate(conn):
    while True:
        try:
            data = conn.recv(1024).decode(utf-8)
            if not data:
                break
            conn.send(data.upper().encode(utf-8))
        except ConnectionResetError:
            break
    conn.close()


if __name__ == __main__:
    g = gevent.spawn(server, 127.0.0.1, 8900)
    g.join()


#client
from socket import *
from threading import Thread, currentThread


def connect():
    client = socket(AF_INET, SOCK_STREAM)
    client.connect((127.0.0.1, 8900))

    while True:
        client.send((%s is say hello%currentThread().getName()).encode(utf-8))
        data = client.recv(1024).decode(utf-8)
        print(data)
    client.close()


if __name__ == __main__:
    for i in range(500):
        t = Thread(target=connect)
        t.start()

 

IO模型

 

比较五种IO Model:  
* blocking IO   阻塞IO
* nonblocking IO   非阻塞IO
* IO multiplexing   IO多路复用
* signal driven IO    信号驱动IO
* asynchronous IO   异步IO
由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。 

 

 

 

再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,

一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

 

1)等待数据准备 (Waiting for the data to be ready)

2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

 

阻塞IO

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

技术图片

 

 

 

 

 

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。

对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

 

而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,

然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

 

所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

 

实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,

如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

 

非阻塞IO

Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

 

技术图片

 

 

 从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。

从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,

于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,

那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

 

 

 

也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,

此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,

循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,

进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

# server
from socket import *

serve = socket(AF_INET, SOCK_STREAM)
serve.bind((127.0.0.1, 8903))
serve.listen(5)
serve.setblocking(False)  # 设置为非阻塞IO 当遇到io时不会阻塞 而是直接报错
r_list = []
w_list = []
print(starting...)
while True:
    try:
        conn, addr = serve.accept()
        r_list.append(conn)
        print(r_list)

    except BlockingIOError:  # 捕捉错误
        # 收消息
        del_rlist = []
        for sock in r_list:
            try:
                data = sock.recv(1024)
                if not data:
                    del_rlist.append(sock)  # 没有消息就断开连接  在遍历的过程中不能更改迭代的对象
                w_list.append((sock, data.upper()))  # 将收发消息功能分隔开
            except BlockingIOError:
                continue
            except Exception:
                sock.close()
                del_rlist.append(sock)

        # 发消息
        del_wlist = []
        for item in w_list:
            try:
                sock = item[0]  # 套接字链接
                data = item[1]  # 要发送的数据
                sock.send(data)
                del_wlist.append(item)  # 发送成功删除
            except BlockingIOError:  # 数据会先从应用程序拷贝到缓存 如果数据比较大 缓存没有足够空间就会阻塞
                pass  # 数据未发送下一次继续发送

        for d in del_rlist:
            r_list.remove(d)

        for i in del_wlist:
            w_list.remove(i)

serve.close()


# client
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect((127.0.0.1, 8903))

while True:
    msg = input(>>>: )
    if not msg: continue
    client.send(msg.encode(utf-8))
    data = client.recv(1024).decode(utf-8)
    print(data)
client.close()

但非阻塞IO不被推荐。

我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

但是也难掩其缺点:

1. 循环调用recv()将大幅度推高CPU占用率;

2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询(即一个一个去查看是否收到数据)一次read操作,而任务可能在两次轮询之间的任意时间完成。

 

IO多路复用

IO multiplexing,有些地方也称这种IO方式为事件驱动IO

select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,

当某个socket有数据到达了,就通知用户进程。它的流程如图:

技术图片

 

 

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,

当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。 

这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),

而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection

 

 

 

强调:

 

1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。

select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

 

2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

 

结论: select的优势在于可以处理多个连接,不适用于单个连接

# server
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind((127.0.0.1, 8903))
server.listen(5)
server.setblocking(False)
print(starting...)

rlist = [server, ]  # IO可以分为两种 一种与收有关 accept recv
wlist = []  # 一种yu发有关 send
wl_data = {}

while True:
    rl, wl, xl = select.select(rlist, wlist, [], 0.5)  # 每隔0.5秒询问一次
    print(rl)
    for sock in rl:
        if sock == server:  # [<socket.socket fd=444, family=AddressFamily.AF_INET,
            # type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8903)>]  # rl中是已经发送数据了的套接字对象
            conn, addr = sock.accept()
            rlist.append(conn)
        else:
            try:
                data = sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                wlist.append(sock)  # 收到消息后 该套接字准备发送信息
                wl_data[sock] = data.upper()  # 将套接字和接收的数据存入字典中
            except Exception:  # 处理中途客户端断开连接的情况
                sock.close()
                rlist.remove(sock)

    for sock in wl:
        sock.send(wl_data[sock])  # 发送成功删除  先有收的数据 才能把数据发回去
        wlist.remove(sock)
        wl_data.pop(sock)



#client
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect((127.0.0.1, 8903))

while True:
    msg = input(>>>: )
    if not msg: continue
    client.send(msg.encode(utf-8))
    data = client.recv(1024).decode(utf-8)
    print(data)
client.close()

该模型的优点:

相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。

如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。


该模型的缺点:

首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。

很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。

如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,

所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。

其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

 

异步IO

技术图片

 

 用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。

然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

异步IO是这四种IO模型里效率最高的。

 

socketserver模块实现并发

 

# server
import socketserver
# 利用socketserver实现并发


# 1. 先定义一个功能类
# class MyServer(socketserver.BaseRequestHandler):
#     def handle(self):  必须是handle方法


# 2.server = socketserver.ThreadingTCPServer((‘127.0.0.1‘, 9000), MyServer)  传入ip 端口 功能类

# server.serve_forever()

class MyServer(socketserver.BaseRequestHandler):
    def handle(self):
        while True:
            data = self.request.recv(1024).decode(utf-8)
            print(收到: , data)
            if data == q:
                break
            msg = input(>>>: )
            self.request.send(msg.encode(utf-8))
        self.request.close()


server = socketserver.ThreadingTCPServer((127.0.0.1, 9000), MyServer)
server.serve_forever()

# client
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect((127.0.0.1, 9000))

while True:
    msg = input(>>>: )
    if not msg: continue
    client.send(msg.encode(utf-8))
    data = client.recv(1024).decode(utf-8)
    print(data)
client.close()

 

以上是关于网络编程进阶及并发编程的主要内容,如果未能解决你的问题,请参考以下文章

并发编程进阶学习篇

程序员进阶架构师必备架构基础技能:并发编程+JVM+网络+Tomcat等

go语言学习笔记 — 进阶 — 并发编程:同步sync,竞态检测 —— 检测代码在并发环境下出现的问题

并发编程之线程进阶

全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段

Python 8 - Socket编程进阶