多线程并发、包括线程池,是操作系统控制的并发。如果是单线程,可以通过协程实现单线程下的并发。
协程 又称微线程,是一种用户态的轻量级线程,由用户程序自己控制调度。
python的线程属于内核级别的,由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
而单线程里开启协程,一旦遇到io,由用户自己控制调度。
特点:
1、单线程里并发
2、修改共享数据不需枷锁
3、用户程序里保存多个控制流的上下文栈
4、附加:一个协程遇到IO操作自动切换到其他协程(yield,greenlet都无法实现检测IO,用gevent模块的select机制可以)
并发:
# 注意到consumer函数是一个generator(生成器): # 任何包含yield关键字的函数都会自动成为生成器(generator)对象 def consumer(): r = ‘‘ while True: # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回; # yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。 # 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时, # 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。 n = yield r if not n: return print(‘[CONSUMER] ←← Consuming %s...‘ % n) time.sleep(1) r = ‘200 OK‘ def produce(): # 1、首先调用c.next()启动生成器 c = consumer() next(c) n = 0 while n < 5: n = n + 1 print(‘[PRODUCER] →→ Producing %s...‘ % n) # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行; cr = c.send(n) # 4、produce拿到consumer处理的结果,继续生产下一条消息; print(‘[PRODUCER] Consumer return: %s‘ % cr) # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。 c.close() if __name__ == ‘__main__‘: start_time = time.time() # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,在一个线程里协作完成。 res = produce() # consumer(res) end_time = time.time() print(end_time-start_time)
串行:
def produce(): n = 0 res = [] while n < 5: n = n + 1 print(‘[PRODUCER] →→ Producing %s...‘ % n) res.append(n) return res def consumer(res): pass if __name__ == ‘__main__‘: start_time = time.time() # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,在一个线程里协作完成。 res = produce() consumer(res) end_time = time.time() print(end_time-start_time)
greenlet模块,方便切换,但遇到IO时,不会自动切换到其他任务
from greenlet import greenlet def eat(name): print(‘%s is eating 1‘%name) gr2.switch(‘zhanwu‘) print(‘%s is eating 2‘ % name) gr2.switch() def play(name): print(‘%s is playing 1‘ % name) gr1.switch() print(‘%s is playing 2‘ % name) gr1 = greenlet(eat) gr2 = greenlet(play) gr1.switch(‘egon‘) #切换的时候传参
gevent模块,异步提交任务,遇到IO时可实现自动切换。
异步提交任务后,主线程结束的话,会导致子线程任务完不成,通过sleep或join实现主线程不死直到子线程运行结束。
from gevent import monkey monkey.patch_all() #将下面的阻塞操作变为非阻塞操作,这是用gevent必须的
def eat(name): print(‘%s is eating 1‘%name) gevent.sleep(3) print(‘%s is eating 2‘ % name) def play(name): print(‘%s is playing 1‘ % name) gevent.sleep(4) print(‘%s is playing 2‘ % name) gevent.joinall( [gevent.spawn(eat,‘egon‘), gevent.spawn(play,‘egon‘)] )
基于gevent实现单线程下的并发。
如果开多个进程,每个进程里开多个线程,每个线程里再开协程,会大大提升效率。 server端 from gevent import monkey,spawn;monkey.patch_all() from socket import * def talk(conn): while True: try: data = conn.recv(1024) if not data: break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): # 来一个客户端,起一个conn server = socket(AF_INET, SOCK_STREAM) server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) server.bind((ip, port)) server.listen(5) while True: conn,addr = server.accept() spawn(talk,conn) server.close() if __name__ == ‘__main__‘: g = spawn(server,‘127.0.0.1‘,8087) g.join() client端: from socket import * from threading import Thread,currentThread def client(): client = socket(AF_INET, SOCK_STREAM) client.connect((‘127.0.0.1‘,8087)) while True: client.send((‘%s say hello‘%currentThread().getName()).encode(‘utf8‘)) data = client.recv(1024) print(data.decode(‘utf-8‘)) if __name__ == ‘__main__‘: for i in range(500): t = Thread(target=client) t.start()