线程,协程,IO模型
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程,协程,IO模型相关的知识,希望对你有一定的参考价值。
理论:
1.每创造一个进程,默认里面就有一个线程 2.进程是一个资源单位,而进程里面的线程才是CPU上的一个调度单位 3.一个进程里面的多个线程,是共享这个进程里面的资源的 4.线程创建的开销比进程要小,不用申请空间,只要基于现在的进程所在的空间,开一条流水线 就可以,所以创造线程的速度比创造进程的速度快 5.进程之间更多的是竞争关系,因为他们彼此之间是互相隔离的,而同一个进程的线程之间是合作关系 线程与进程的区别 1.同一进程内的线程共享创建它们的进程的地址空间,也就是同一进程内的多个线程共享资源,进程拥有自己的地址空间,也就是说父进程 和子进程是完全独立的地址空间 2.线程可以直接访问进程的数据。在Linux系统下,主进程造一个子进程,子进程会把父进程的状态完整的拷贝一份当作子进程的初始状态,但是当子进程 在运行过程中再产生的数据或者把数据更改了就和父进程无关了 3.同一进程的线程可以相互通信。进程彼此之间内存空间是相互隔离的,若通信需要找一块共享的内存空间,共享意味着竞争, 所以需要加锁处理,那么就需要寻找既是共享的内存空间,而且还自动处理了锁,使用队列。队列就是ipc机制的一种进程之间 通信的方式,与它相类似的还有管道,只不过管道需要自己加锁处理,所以还是使用队列更方便。 线程是没有必要使用ipc机制的,因为默认就是共享同一进程的内存空间,但存在竞争的问题,所以只能加锁,使用线程自己的队列 4.同等资源情况下,能开的线程数量多于开的进程数量,线程开销小,创建速度快,意味着能创建更多线程
开启线程的两种方式
1.
from threading import Thread def task(): print(‘is running‘) if __name__==‘__main__‘: t=Thread(target=task,) t.start() print(‘主‘) #is running #主 t.start() 发信号给操作系统,但是是基于当前进程已经有了空间的基础之上直接开线程 就可以了,当开始运行第一行代码的时候,进程就已经产生了,等到运行t.stat()的时候, 进程的空间早就开启了好长时间了,所以start的时候不用申请空间了,直接开一个流水线 就好了,开销小,所以就先看到‘is running‘ from multiprocessing import Process def task(): print(‘is running‘) if __name__ == ‘__main__‘: t=Process(target=task,) t.start() print(‘主‘) #主 #is running 开进程的开销大,要拷贝父进程的状态,需要的时间长,在 t.start() 给操作系统发出 申请后,操作系统要申请空间把这个进程造出来,还要再造一个线程,在这段时间内, print(‘主‘)已经执行了。子进程造出来后就打印 is running
2.
from threading import Thread class MyThread(Thread): def run(self): print(‘is running‘) if __name__ == ‘__main__‘: t=MyThread() t.start() print(‘主‘) 如果要传参数 from threading import Thread class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print(‘%s is running‘ % self.name) if __name__ == ‘__main__‘: t=MyThread(‘egon‘) t.start() print(‘主‘)
线程与进程
from threading import Thread from multiprocessing import Process import os def task(): print(‘%s is running‘ % os.getpid()) if __name__==‘__main__‘: t1=Thread(target=task,) t2=Thread(target=task,) t1.start() t2.start() print(‘主‘,os.getpid()) #1376 is running #1376 is running #主 1376 线程和主线程看到的pid都是一样的,因为这多个线程都是在一个进程里面 from threading import Thread from multiprocessing import Process import os def task(): print(‘%s is running‘ % os.getpid()) if __name__==‘__main__‘: t1=Process(target=task,) t2=Process(target=task,) t1.start() t2.start() print(‘主‘,os.getpid()) #主 4136 主进程pid #5588 is running 子进程pid #6532 is running 子进程pid
多线程共享同一个进程内的资源
from threading import Thread from multiprocessing import Process n=100 def work(): global n n=0 if __name__==‘__main__‘: p=Process(target=work,) p.start() p.join() print(‘主‘,n) #主 100 主进程看n,主进程的n没有被改过 在开子进程的时候,数据会被拷贝到子进程,改全局变量是改的子进程的全局变量, 子进程的n改为0,但是主进程的n仍然是100 子进程与主进程是完全独立的内存空间 from threading import Thread from multiprocessing import Process n=100 def work(): global n n=0 if __name__==‘__main__‘: t=Thread(target=work,) t.start() t.join() print(‘主‘,n) # 主 0 线程是共享同一个进程的地址空间,改全局变量的n,这个n就来自进程的n,直接就改掉了
多线程共享同一进程内地址空间练习
from threading import Thread msg_l=[] format_l=[] def talk(): ‘‘‘用户输入后添加到列表‘‘‘ while True: msg=input(‘>>: ‘).strip() msg_l.append(msg) def format(): ‘‘‘弹出数据并且改为大写后添加到新列表‘‘‘ while True: if msg_l: data=msg_l.pop() format_l.append(data.upper()) def save(): while True: if format_l: # 如果有数据 data=format_l.pop() # 数据拿出来后保存到文件中 with open(‘db.txt‘,‘a‘) as f: f.write(‘%s\n‘%data) #因为中间需要共享数据所以需要多线程 if __name__==‘__main__‘: t1=Thread(target=talk,) t2=Thread(target=format,) t3=Thread(target=save,) t1.start() t2.start() t3.start()
所以只要是涉及到共享数据的多个并发任务可以用多线程实现
Thread对象其他相关的属性或方法
from threading import Thread def talk(): print(‘is running‘) if __name__==‘__main__‘: t=Thread(target=task,) t.start() t.join() # 主进程等待子线程执行完 print(t.is_alive()) # 判断线程是否存活 print(‘主‘) #is running #False #主
from threading import Thread def talk(): print(‘is running‘) if __name__==‘__main__‘: t=Thread(target=task,) t.start() print(t.is_alive()) print(‘主‘) print(t.is_alive()) #is running #True #主 #True
from threading import Thread def talk(): print(‘is running‘) if __name__==‘__main__‘: t=Thread(target=task,) t.start() print(t.is_alive()) print(t.getName()) print(‘主‘) print(t.is_alive()) #is runnning #False #Thread-1 #主 #False
所以在没有join方法的情况下,True和False是说不准的,
取决于操作系统什么时候回收它,它才什么时候会死掉
from threading import Thread,activeCount def talk(): print(‘is running‘) if __name__==‘__main__‘: t=Thread(target=task,) t.start() print(‘主‘) print(activeCount()) #is running #主 #2 -----> 活着的线程数,一个主线程,和主线程开启的线程
from threading import Thread,activeCount,enumerate def talk(): print(‘is running‘) if __name__==‘__main__‘: t=Thread(target=task,) t.start() print(‘主‘) print(activeCount()) print(enumerate()) # --->显示当前活跃的线程对象 #is running #主 #1 #[<_MainThread(MainThread, started 5588)>]
from threading import Thread,activeCount,enumerate import time def talk(): print(‘is running‘) time.sleep(2) # 保证2s内线程死不掉 if __name__==‘__main__‘: t=Thread(target=task,) t.start() print(enumerate()) print(‘主‘) #is running #[<_MainThread(MainThread, started 1060)>, <Thread(Thread-1, start 4496)>] #主 一个主线程和一个Thread-1线程
#加入一个join方法 from threading import Thread,activeCount,enumerate import time def talk(): print(‘is running‘) time.sleep(2) # 保证2s内线程死不掉 if __name__==‘__main__‘: t=Thread(target=task,) t.start() t.join() print(enumerate()) print(‘主‘) #is running #[<_MainThread(MainThread, started 6172)>] #主 只有主线程
from threading import Thread,activeCount,enumerate,current_thread import time def talk(): print(‘%s is running‘%current_thread().getName()) # 当前的线程对象 time.sleep(2) if __name__==‘__main__‘: t=Thread(target=task,) t.start() t.join() print(‘主‘) #Thread-1 is running
验证开一个进程默认就有一个主线程
from threading import Thread,current_thread
from multiprocessing import Process
import time
print(current_thread())
#<_MainThread(MainThread, started 6192)>
右键一运行就会产生一个进程,进程不是一个执行单位,只是一个资源单位
主进程执行其实是主进程中的主线程在执行,所以谈到执行一定往线程上靠
from threading import Thread,current_thread from multiprocessing import Process import time def task(): print(‘%s is running‘% current_thread().getName()) # 子进程的Main_Thread time.sleep(2) if __name__==‘__main__‘: p=Process(target=task,) # 这个进程中的主线程开始执行代码了 p.start() # 开一个子进程,里面还有一个主线程 print(current_thread()) # 父进程的主线程 #<_MainThread(MainThread, started 5056)> #MainThread is running
主线程从执行层面代表了其所在进程的执行过程
from threading import Thread,current_thread from multiprocessing import Process import time def task(): print(‘%s is running‘% current_thread().getName()) time.sleep(2) if __name__==‘__main__‘: t1=Thread(target=task,) t2=Thread(target=task,) t3=Thread(target=task,) t1.start() t2.start() t3.start() print(current_thread().getName()) #Thread-1 is running #Thread-2 is running #Thread-3 is running #MainThread 在一个进程里面,主线程只有一个,其余的都是它开启的一些线程
守护线程
主线程挂掉,守护线程也会挂掉
#先看守护进程 from multiprocessing import Process import time def task(): print(‘123‘) time.sleep(2) print(‘123done‘) if __name__ == ‘__main__‘: p=Process(target=task,) p.start() print(‘主‘) #主 #123 #123done
主进程即使运行完了也要一直等待子进程运行完毕才结束掉
from multiprocessing import Process import time def task(): print(‘123‘) time.sleep(2) print(‘123done‘) if __name__ == ‘__main__‘: p=Process(target=task,) p.daemon=True p.start() print(‘主‘) #主
只要主进程运行完毕守护进程就死掉,那么主怎么算运行完毕,代码运行完了就算完了
‘主‘出来,子进程还没来得及开启就已经被干掉了
from multiprocessing import Process import time def task1(): print(‘123‘) time.sleep(1) print(‘123done‘) def task2(): print(‘456‘) time.sleep(10) print(‘456done‘) if __name__ == ‘__main__‘: p1=Process(target=task1,) p2=Process(target=task2,) p1.daemon=True p1.start() p2.start() print(‘主‘) #主 #456 #456done
10s的话 ‘123‘‘123done‘应该也出来了,为什么没有出来,
因为主进程代码结束就把p1干掉了,还没有来得及开启,
虽然代码运行完毕了,但是还要等待子进程p2运行完毕
如果机器的性能非常高,在p1.start()h和p2.start()的时候就已经
运行起来了,有可能‘123‘就打印在屏幕上了,但1s的时间对于操作系统
已经足够长了,足够可以打印出‘主‘,然后p1就被干掉了
对主进程来说,运行完毕指的是主进程代码运行完毕
对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
from threading import Thread import time def task1(): print(‘123‘) time.sleep(1) print(‘123done‘) if __name__ == ‘__main__‘: t=Thread(target=task1,) t.start() print(‘主‘) #123 #主 #123done
from threading import Thread import time def task1(): print(‘123‘) time.sleep(1) print(‘123done‘) if __name__ == ‘__main__‘: t=Thread(target=task1,) t.daemon=True t.start() print(‘主‘) #123 #主
from threading import Thread import time def task1(): print(‘123‘) time.sleep(1) print(‘123done‘) def task2(): print(‘456‘) time.sleep(1) print(‘456done‘) if __name__ == ‘__main__‘: t1=Thread(target=task1,) t2=Thread(target=task2,) t1.daemon=True t1.start() t2.start() print(‘主‘) #123 #456 #主 #123done #456done
进程中除了有主线程,还有其他非守护线程,主线程要等着非守护线程task2结束,要等
10s,但是10s也够task1运行完毕了,所以也会打印‘123done‘
from threading import Thread import time def task1(): print(‘123‘) time.sleep(10) print(‘123done‘) def task2(): print(‘456‘) time.sleep(1) print(‘456done‘) if __name__ == ‘__main__‘: t1=Thread(target=task1,) t2=Thread(target=task2,) t1.daemon=True t1.start() t2.start() print(‘主‘) #123 #456 #主 #456done
线程的互斥锁
1 from threading import Thread,Lock 2 import time 3 4 n=100 5 def work(): 6 global n 7 mutex.acquire() 8 temp=n 9 time.sleep(0.1) 10 n=temp-1 11 mutex.release() 12 13 if __name__ == ‘__main__‘: 14 mutex=Lock() 15 l=[] 16 start=time.time() 17 for i in range(100): 18 t=Thread(target=work,) 19 l.append(t) 20 t.start() 21 22 for t in l: 23 t.join() 24 print(‘run time:%s value:%s‘%(time.time()-start,n))
互斥锁与join的区别
from threading import Thread,Lock import time n=100 def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == ‘__main__‘: start=time.time() for i in range(100): t=Thread(target=work,) l.append(t) t.start() t.join() print(‘run time:%s value:%s‘%(time.time()-start,n))
死锁与递归锁
#死锁 from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘抢到了A锁‘%self.name) mutexB.acquire() print(‘抢到了B锁‘%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘抢到了B锁‘%self.name) time.sleep(1) mutexA.acquire() print(‘抢到了A锁‘%self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(20): t=Mythread() t.start() #Thread-1 抢到了A锁 #Thread-1 抢到了B锁 #Thread-1 抢到了B锁 #Thread-2 抢到了A锁
#递归锁 from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘抢到了A锁‘%self.name) mutexB.acquire() print(‘抢到了B锁‘%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘抢到了B锁‘%self.name) time.sleep(1) mutexA.acquire() print(‘抢到了A锁‘%self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(20): t=Mythread() t.start()
IO模型介绍
同步,异步指的是提交任务或调用任务的方式
阻塞指的是线程的执行状态
1.等待数据的准备
2.将数据从内核拷贝到进程中
1.阻塞IO 阻塞io服务端 from socket import * server=socket(AF_INET,SOCK_STREAM) server.bind((‘127.0.0.1‘,8080)) server.listen(5) while True: conn,addr = server.accept() print(addr) while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() server.close()
非阻塞IO 服务端 from socket import * server=socket(AF_INET,SOCK_STREAM) server.bind((‘127.0.0.1‘,8080)) server.listen(5) server.setblocking(False) # 默认为True, 改为False就是非阻塞 import time conns=[] del_l=[] while True: try: print(conns) conn,addr = server.accept() conns.append(conn) except BlockingIOError: for conn in conns: try: data=conn.recv(1024) conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: conn.close() del_l.append(conn) for conn in del_l: conns.remove(conn) del_l=[] 客户端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8080)) while True: msg=input(‘>>: ‘).strip() if not msg:continue client.send(msg.encode(‘utf-8‘)) data=client.recv(1024) print(data.decode(‘utf-8‘))
IO多路复用(推荐) 实现select IO多路复用模型 服务端 from socket import * import select import time server=socket(AF_INET,SOCK_STREAM) server.bind((‘127.0.0.1‘,8080)) server.listen(5) server.setblocking(False) reads=[server,] while True: rl,_,_=select.select(reads,[],[]) for obj in rl: if obj == server: conn,addr=obj,accept() reads.append(conn) else: try: data=obj.recv(1024) if not data: obj.close() reads.remove(obj) continue obj.send(data.upper()) except Exception: obj.close() reads.remove(obj)
协程
在单线程下实现并发
1.基于yield实现并发 import time def consumer(): while True: res=yield def producer(): g=consumer() next(g) for i in range(1000000): g.send(i) start=time.time() producer() print(time.time()-start) 没有遇到io就乱切换,只是单纯的切换,反而会降低运行效率
import time def consumer(res): print(‘consumer‘) time.sleep(10) def producer(): res=[] for i in range(1000000): res.append(i) return res start=time.time() res=producer() consumer(res) print(time.time()-start)
import time def consumer(): while True: res=yield print(‘consumer‘,res) time.sleep(10) def producer(): g=consumer() next(g) for i in range(10000): print(‘producer‘,i) g.send(i) start=time.time() producer() print(time.timet()-start) yield只是单纯的切换,跟效率无关
greenlet模块
只是单纯意义上的切换,唯一的好处是切换起来比yield方便,仍然没有解决遇到IO就切换
from greenlet import greenlet import time def eat(name): print(‘%s eat 1‘% name) g2.switch(‘tom‘) # 暂停然后切到play() print(‘%s eat 2‘% name) g2.switch() def play(name): print(‘%s play 1‘% name) time.sleep(10) # 睡的时候并没有切换到别的函数,如eat(),而是继续等待 g1.switch() # 第一次传参数就可以了。暂停然后切换到eat()剩余的部分 print(‘%s play 2‘% name) g1=greenlet(eat) g2=greenlet(play) g1.switch(‘tom‘) # 切到eat() # tom eat 1 # tom play 1 # tom eat 2 # tom play 2
gevent模块
import gevent def eat(name): print(‘%s eat 1‘ % name) gevent.sleep(3) print(‘%s eat 2‘ % name) def play(name): print(‘%s play 1‘ % name) gevent.sleep(2) print(‘%s play 2‘ % name) g1=gevent.spawn(eat, ‘tom‘) # spawn() 是异步提交任务,只管提交任务,不管执行没执行 g2=gevent.spawn(play, ‘tom‘) # 想要看到执行过程,就需要等(join()) gevent.joinall([g1,g2]) # tom eat 1 # tom play 1 # tom play 2 先睡完(2s),所以就是play 2打印 # tom eat 2
这个函数的io是gevent.sleep()模拟的,如果是time.sleep()呢?,time.sleep()是不能被gevent识别的
from gevent import monkey;monkey.patch_all() #把这句代码之下的所有io操作都打上能被gevent识别的io操作的补丁,否则 import gevent #在用time.sleep()时,就会串行运行 import time def eat(name): print(‘%s eat 1‘ % name) time.sleep(1) print(‘%s eat 2‘ % name) def play(name): print(‘%s play 1‘ % name) time.sleep(2) print(‘%s play 2‘ % name) g1=gevent.spawn(eat, ‘tom‘) g2=gevent.spawn(play, ‘tom‘) gevent.joinall([g1,g2]) #tom eat 1 #tom play 1 #tom eat 2 #tom play 2
这就是单线程下的并发,也就是协程了,协程是用户程序自己控制调度的,操作系统是看不到的,我们通过gevent模块把io操作
隐藏了起来。协程的切换开销更小。
基于协程实现并发的套接字通信
#客户端 from socket import * client=socket(AF_INET, SOCK_STREAM) client.connect((‘127.0.0.1‘,8080)) while True: msg=input(‘>>: ‘).strip() if not msg:continue client.send(msg.encode(‘utf-8‘)) msg=client.recv(1024) print(msg.decode(‘utf-8‘))
#服务端 from socket import * import gevent def server(server_ip, port): s=socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.bind((server_ip, port)) s.listen(5) while True: conn,addr = s.accept() gevent.spawn(talk,conn,addr) def talk(conn, addr): try: while True: res = conn.recv(1024) print(‘client %s:%s msg:%s‘% (addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == ‘__main__‘: server(‘127.0.0.1‘, 8080)
#客户端 from threading import Thread from socket import * def client(): client=socket(AF_INET, SOCK_STREAM) client.connect((‘127.0.0.1‘, 8080)) while True: client.send(‘hello‘, encode(‘utf-8‘)) msg=client.recv(1024) print(msg.decode(‘utf-8‘)) if __name__ == ‘__main__‘: for i in range(500): t=Thread(target=client,) t.start()
协程的缺点:
本质上是单线程,无法利用多核,所以如果想把程序最大效率的提升,就应该把
程序的io操作最大限度地降到最低。
协程指的是单个线程,一旦协程出现阻塞,将会阻塞整个线程
进程池和线程池
concurrent.futures模块提供了进程池和线程池,并且提供了更高级别的接口,
为的是异步执行调用
#进程池 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os def work(n): print(‘%s is running‘%os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == ‘__main__‘: p=ProcessPoolExecutor() #默认开4个进程 objs=[] for i in range(10): obj=p.submit(work, i) objs.append(obj) p.shutdown() for obj in objs: print(obj.result())
#线程池 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os from threading import current_thread def work(n): print(‘%s is running‘%current_thread().getName()) time.sleep(random.randint(1,3)) return n**2 if __name__ == ‘__main__‘: p=ThreadPoolExecutor() #默认为cpu的个数*5 objs=[] for i in range(21): obj=p.submit(work, i) objs.append(obj) p.shutdown() for obj in objs: print(obj.result())
事件Event
from threading import Thread,current_thread,Event import time event=Event() def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise ConnectionError(‘连接失败‘) print(‘%s 等待第%s次链接mysql‘%(current_thread().getName(),count)) event.wait(0.5) #全局变量默认为False,在这里等变为True,超时时间一过就不再等待 count+=1 print(‘%s 链接ok‘ % current_thread().getName()) def check_mysql(): print(‘%s 正在检查mysql状态‘ % current_thread().getName()) time.sleep(1) event.set() #把全局变量变为True if __name__ == ‘__main__‘: t1 = Thread(target=conn_mysql) t2 = Thread(target=conn_mysql) check = Thread(target=check_mysql) t1.start() t2.start() check.start()
定时器
from threading import Timer def hello(n): print(‘hello,world‘,n) t = Timer(3, hello,args=(11,)) # 3s后运行 t.start() #hello,world 11
线程queue(了解)
import queue q=queue.Queue(3) #模拟对列,先进先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) #1 #2 #3
import queue q=queue.LifoQueue(3) # 模拟堆栈,后进先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) #3 #2 #1
import queue q=queue.PriorityQueue(3) #数字越小,优先级越高 q.put((10, ‘data1‘)) # (优先级,数据) q.put((11, ‘data2‘)) q.put((9, ‘data3‘)) print(q.get()) print(q.get()) print(q.get()) #(9, ‘data3‘) #(10, ‘data1‘) #(11, ‘data2‘)
以上是关于线程,协程,IO模型的主要内容,如果未能解决你的问题,请参考以下文章