网络编程进阶及并发编程
Posted sxy-blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络编程进阶及并发编程相关的知识,希望对你有一定的参考价值。
并发编程之多进程
进程理论
进程
进程:正在运行的一个过程或一个任务。负责执行任务的是cpu。
程序与进程的区别:程序只是一堆代码,而进程指的是程序的运行过程。
注意同一个程序执行两次,是两个进程。比如打开两个QQ,登陆的是不同人的QQ号。
并行与并发
无论是并行还是并发,在用户看来都是‘同时‘运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,
cpu来做这些任务,而一个cpu同一时刻只能执行一个任务。
并发:并发是看起来像是一起执行,实际上是通过在不同人物之间快速切换,使任务看起来在同时进行。
并行:真正意义上的同时执行,需要具备多个cpu。
进程的创建
但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式。
而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程
-
系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关)
-
一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
-
用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
-
一个批处理作业的初始化(只在大型机的批处理系统中应用)
无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:
-
在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、
-
同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)
-
在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。
关于创建的子进程,UNIX和windows
1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间,任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。
2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,将父进程地址空间的数据作为子进程的起始状态。
但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的,数据也是来自于父进程,但并不完全相同。
进程的终止
-
正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
-
出错退出(自愿,python a.py中a.py不存在)
-
严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
-
被其他进程杀死(非自愿,如kill -9)
进程的状态
其实在两种情况下会导致一个进程在逻辑上不能运行,
-
进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作。
-
与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。
因而一个进程由三种状态
当一个处于运行态的进程占用时间过多就可能进入就绪态,过一段时间再进入运行态。
当程序在执行过程中遇到IO就会进入阻塞状态,IO完成后会先进入就绪状态,再进入运行态。
进程并发的实现
进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,从而保证进程再次运行时像从未中断过一样。
multiprocessing模块
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
Process类
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数:
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,)
kwargs表示调用对象的字典,kwargs={‘name‘:‘alex‘,‘age‘:18}
name为子进程的名称
创建进程的两种方式
注:开启多线程多进程一定要将主进程或主线程放在 if __name__ == ‘__main__‘: 下
from multiprocessing import Process import time # 方式一 def task(name): print(‘%s is running‘ % name) time.sleep(3) print(‘work is done‘) if __name__ == ‘__main__‘: # Windows系统一定要放在main下 p = Process(target=task, args=(‘子进程1‘,)) # 创建了一个对象 以位置传参 # p = Process(target=task, kwargs={‘name‘: ‘子进程1‘}) # 字典的形式传参 p.start() # 仅给操作系统发送了一个信号 告诉操作系统开启一个子进程 申请开辟一片内存空间 # 将父进程地址空间的数据作为子进程的起始状态 # 两个进程运行状态独立 不影响主进程的执行 print("主") # 方式二 面向对象 class MyProcess(Process): def __init__(self, name): super().__init__() # 继承父类原有方法 self.name = name def run(self): # 子进程的执行方法一定要叫run print("%s is running" % self.name) time.sleep(3) print(‘task is done‘) if __name__ == ‘__main__‘: p = MyProcess(‘子进程1‘) p.start() # 会执行run的方法 print(‘主‘)
进程的pid和ppid
每一个进程都有自己的pid,相当于自身的编号,通过这些编号我们可以区分不同的进程。
查看进程的pid可用os模块下的os.getpid()方法。
ppid是查看当前进程的父进程的id号,可用os.getppid()查看。
from multiprocessing import Process import time, os def task(): print("%s id is running, parent id is %s" % (os.getpid(), os.getppid())) time.sleep(3) print(‘%s is done‘ % (os.getpid())) # os.getpid() 拿到当前进程的编号 os.getppid()拿到父进程的id if __name__ == ‘__main__‘: p = Process(target=task) p.start() print(‘主‘, os.getpid(), os.getppid()) # 如果在Pycharm上运行py文件 当前主进程的父进程id就是Pycharm的id # windows 可在终端使用 tasklist | findstr pycharm 查看pycharm的pid # 如果在终端执行这个py文件,那么ppid就是终端的pid
僵尸进程与孤儿进程
当一个父进程开启了多个子进程 父进程想要查看子进程 但是子进程此时已经执行完毕 在内存中消失了 那么应该怎么去看?
当子进程死亡,系统会把子进程的一些状态信息保留,即保留子进程的尸体,以便父进程可以查看到自己所有子进程的状态,这种进程叫做僵尸进程。
太多的僵尸进程会占用系统的pid 如果父进程一直不死,就会一直占用这些pid 那么当系统开启新的进程时,就可能起不来。
当父进程先执行完成,子进程还在执行,就相当于没有了爹,这时候此进程就叫做孤儿进程,在linux下 所有进程的父亲是Init进程。
孤儿进程直接由init监管、回收,孤儿进程是无害的。
Process对象的join()方法
在主进程运行过程中如果想并发地执行其他的任务,我们可以开启子进程,此时主进程的任务与子进程的任务分两种情况。
情况一:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源。
情况二:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要有一种机制能够让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是join方法的作用。
from multiprocessing import Process import time import random import os def task(): print(‘%s is piaoing‘ %os.getpid()) time.sleep(random.randrange(1,3)) print(‘%s is piao end‘ %os.getpid()) if __name__ == ‘__main__‘: p=Process(target=task) p.start() p.join() #等待p停止,才执行下一行代码 print(‘主‘)
那么有了join()程序是否就变成了串行执行?
from multiprocessing import Process import time import random def task(name): print(‘%s is piaoing‘ %name) time.sleep(random.randint(1,3)) print(‘%s is piao end‘ %name) if __name__ == ‘__main__‘: p1=Process(target=task,args=(‘egon‘,)) p2=Process(target=task,args=(‘alex‘,)) p3=Process(target=task,args=(‘yuanhao‘,)) p4=Process(target=task,args=(‘wupeiqi‘,)) p1.start() p2.start() p3.start() p4.start() # 有的人有疑问: 既然join是等待进程结束, 那么我像下面这样写, 进程不就又变成串行的了吗? # 当然不是了, 必须明确:p.join()是让谁等? # 很明显p.join()是让主线程等待p的结束,卡住的是主进程而绝非子进程p p1.join() p2.join() p3.join() p4.join() print(‘主‘)
p1,p2,p3,p4四个子进程都向操作系统发出了请求,执行的时候四个进程是并发执行,当四个进程都执行完毕,主进程才可以执行,join方法卡住的是主进程,
子进程仍然是并发的。
Process对象的其他属性和方法
p.is_alive(): 查看进程是否处于存活状态。
p.terminate(): 向操作系统发出请求关闭进程。
p.pid: 查看进程的pid。
p.name: 查看进程的名字,默认为Process-进程数。
from multiprocessing import Process import time def task(name, n): """进程要执行的任务""" print("%s is running" % name) time.sleep(n) print(‘%s is done‘ % name) if __name__ == ‘__main__‘: p1 = Process(target=task, args=(‘子进程1‘, 1)) p2 = Process(target=task, args=(‘子进程2‘, 3), name=‘子进程2‘) p3 = Process(target=task, args=(‘子进程3‘, 5)) p_list = [p1, p2, p3] for p in p_list: p.start() # 向操作系统发出开启子进程的请求 # for p in p_list: # p.join() # 卡住主进程,等到所有的子进程执行完,再执行主进程,子进程之间是并发的,卡的时间取决于执行时间最久的子进程 print(‘p1‘, p1.is_alive()) # True p1.join() print(‘p1‘, p1.is_alive()) # False p.is_alive()判断子进程是否处于执行的状态 print(‘p2‘, p2.is_alive()) p2.terminate() # 关闭进程,向操作系统发送请求 操作系统需要反应一段时间 不会立刻关闭,因此可能之后查看时,进程可能仍然存活 time.sleep(1) # 经过一小段时间,操作系统执行,进程被关闭 print(‘p2‘, p2.is_alive()) # False print(‘主‘) print(p1.name, p1.pid) # 查看子进程的pid可直接用p.pid方法 # p.name 查看进程的名字 默认就是开启的子进程数Process-1 也可以规定 print(p2.name, p2.pid) # 由此也可验证僵尸进程 子进程执行完毕 保留状态信息
进程之间的空间是隔离的
from multiprocessing import Process def task(): global n n = 0 print("子进程:%s" % n) # n=0 if __name__ == ‘__main__‘: n = 100 p = Process(target=task) p.start() print("主进程:%s" % n) #n = 100
上面的程序在主进程中定义了一个n=100,开启了一个子进程声明全局变量对n进行修改,在子进程中n=0,但是在主进程中n=100,
这就说明了主进程与子进程之间的空间是隔离的,数据不共享。
基于多进程的套接字通信
可以接收多个客户端连接,实现与多个客户端通信。
将连接和通信的活分开,就相当于前台,每来一个客户,就安排一个服务员单独服务。
# server from multiprocessing import Process from socket import * def task(conn): while True: try: data = conn.recv(1024).decode(‘utf-8‘) if not data: break conn.send(data.upper().encode(‘utf-8‘)) except ConnectionResetError: break def connect(host, port): server = socket(AF_INET, SOCK_STREAM) server.bind((host, port)) server.listen(5) while True: conn, addr = server.accept() p = Process(target=task, args=(conn,)) p.start() server.close() if __name__ == ‘__main__‘: connect(‘127.0.0.1‘, 8080) # client from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect((‘127.0.0.1‘, 8080)) while True: msg = input(‘>>>‘) client.send(msg.encode(‘utf-8‘)) data = client.recv(1024).decode(‘utf-8‘) print(data)
守护进程
主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比崇祯皇帝身边的老太监,崇祯皇帝已死老太监就跟着殉葬了。
关于守护进程需要强调两点:
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
如果守护进程有子进程,那么主进程结束,守护进程也会结束,那么守护进程的子进程就成为了孤儿进程,孤儿进程统一有init进程回收,如果孤儿进程有很多,会降低效率。
如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止。
将子进程设置为守护进程的语法,p.daemon = True
from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == ‘__main__‘: p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True # 设置为守护进程 主进程代码运行结束 子进程也结束 p1.start() p2.start() print("main-------") # 守护进程不可以开启子进程 # 此程序当打印出main时p1就会结束执行 不可能看到end123
互斥锁
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如下
#并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print(‘%s is running‘ %os.getpid()) time.sleep(2) print(‘%s is done‘ %os.getpid()) if __name__ == ‘__main__‘: for i in range(3): p=Process(target=work) p.start()
如果我想要实现的效果是 xxx is running xxx is done,那么这种方式一定是无法完成的,因为在time.sleep的过程中,cpu会把执行权限交给别的进程不会一直等待。
那如何实现这种效果?那就是加锁处理。
从multiprocess模块中导入Lock类,实例化,然后加锁。
加锁的语法,实例化一把锁之后,在程序开头mutex.acquire(),释放锁时 mutex.release()。
from multiprocessing import Process,Lock import time def task(name,mutex): mutex.acquire() # 得到锁 print(‘%s-1 is running‘ % name) time.sleep(1) print(‘%s-2 is running‘ % name) time.sleep(1) print(‘%s-3 is running‘ % name) time.sleep(1) mutex.release() # 释放锁 if __name__ == ‘__main__‘: mutex = Lock() # 互斥锁 要保证传入的是同一把锁 互斥锁就是把并行变为串行 for i in range(1, 4): p = Process(target=task, args=((‘进程%s‘ % i),mutex)) p.start()
互斥锁的原理就是把并行变为串行,虽然降低了效率,但是提高了数据的安全性。
互斥锁和join()的区别
使用join可以将并发变成串行,互斥锁的原理也是将并发变成穿行,那我们直接使用join就可以了啊,为何还要互斥锁?
我们来看一个模拟抢票的程序。
from multiprocessing import Process,Lock import json import time def search(name): time.sleep(1) data = json.load(open(‘db.json‘, ‘r‘, encoding=‘utf-8‘)) print("%s查看剩余票数<%s>" % (name, data[‘count‘])) def get(name, mutex): mutex.acquire() data = json.load(open(‘db.json‘, ‘r‘, encoding=‘utf-8‘)) time.sleep(3) if int(data[‘count‘]) > 0: print(‘乘客%s购票成功‘ % name) dic = {‘count‘: int(data[‘count‘]) - 1} json.dump(dic, open(‘db.json‘, ‘w‘, encoding=‘utf-8‘)) # db.json是票的数据库 mutex.release() def task(name, mutex): search(name) get(name, mutex) if __name__ == ‘__main__‘: mutex = Lock() for i in range(10): p = Process(target=task, args=((‘路人%s‘ % i), mutex)) p.start() # p.join()
如果我们不加锁,在启动进程时直接p.join(),那么乘客在查票时也就只能一个一个去查,这显然不符合实际。我们需要保证的只是在买票时,一个人只能买到一张票。
join()将整个程序都变成了串行执行,但是我们需要的只是在买票时串行就可以了,这时锁的优势就体现出来了。
进程与进程之间 内存空间是隔离的,但可以共享硬盘上的空间(文件)。
我们可以只在需要的地方即修改共享数据的地方加锁,而其他不需要的地方我们可以不作修改。
总结
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1、效率低(共享数据基于文件,而文件是硬盘上的数据),从硬盘上读取文件效率很低。
2、需要自己加锁处理,需要注意在适当的地方加锁解锁,否则可能会形成死锁(之后会提到),使整个程序卡住。
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题
这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
队列
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
创建队列的类:
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
maxsize是队列中允许最大项数,省略则无大小限制,可以是任意的数据类型。
但需要明确:
1、队列内存放的是消息而非大数据
2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
主要方法:
q.put方法用以插入数据到队列中。
q.get方法可以从队列读取并且删除一个元素。
from multiprocessing import Process,Queue q = Queue(3) # 实例化产生一个队列 内部参数是最大的数据数 可以是任意数据类型 q.put(1) # 放入数据 q.put({‘a‘: 2}) q.put([123]) # q.put() 此时已取完 会卡住等待有人取走数据 print(q.full()) # 判断队列是否已满 print(q.get()) # 取出数据 print(q.get()) print(q.get()) print(q.empty()) # 判断队列是否为空 # q.get() 此时会卡住 等待放入数据 # 队列用于进程间的通信 # 进程间的通信有两种方式 一种是管道 一种是队列 # 队列就是在管道的基础上加上锁 因此用队列比较好 # 队列内存放的数据不应该过大 队列占用的是内存空间 因此速度快 但最大的数据数也受制于内存空间
生产者消费者模型
为什么要用生产者消费者模型?
生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,
那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模型?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,
所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
下面用吃包子这个例子来解释
from multiprocessing import Process,Queue import time,random,os def consumer(q,name): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘