Python 协程
Posted tuerlueur
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 协程相关的知识,希望对你有一定的参考价值。
迭代器
- 可迭代 (Iterable):直接作用于for循环变量
- 迭代器 (Iterator):直接作用于for循环变量,并且可以用next调用
- 鉴别,可用
isinstancle()
生成器
不用for,占内存小,一边循环一边计算——时间换空间
next函数调用,到最后一个,报
StopIteration
异常生成:
- 直接使用
g = (x * x for x in range(10)) # 中括号是列表生成器,小括号是生成器
- 函数包含yield,则叫生成器,next调用函数,遇yield返回
def odd(): print('step 1') yield 1 print('step 2') yield(3) print('step 3') yield(5) g = odd() one = next(g) print(one) two = next(g) print(two) three = next(g) print(three)
Step 1
1
Step 2
2
Step 3
3注意:此时g是一个generator, next调用从上次yield后开始
- for循环调用生成器
def fib(max): n, a, b = 0, 0, 1 while n < max: yield b a, b = b, a+b n += 1 return 'Done' g = fib(5) for i in g: print(i)
1
1
2
3
5
协程
定义:为非抢占式多任务产生子程序,可以暂停执行——像generator一样
关键字:
yield
和send
def simple_coroutine(a): print('-> start') b = yield a print('-> recived', a, b) c = yield a + b print('-> recived', a, b, c) # runc sc = simple_coroutine(5) aa = next(sc) # 预激 print(aa) bb = sc.send(6) # 5, 6 print(bb) cc = sc.send(7) # 5, 6, 7 print(cc)
-> start
5
-> recived 5 6
11
-> recived 5 6 7分析:动脑子
协程终止:向上冒泡,发送哨符值,让协程退出
yield from:相当于加一个通道(协程与主线程间)
def gen(): for c in 'AB': yield c print(list(gen())) def gen_new(): yield from 'AB' print(list(gen_new()))
委派生成器:包含yield from的生成器函数
from collections import namedtuple ResClass = namedtuple('Res', 'count average') # 子生成器 def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total / count return ResClass(count, average) # 委派生成器 def grouper(storages, key): while True: # 获取averager()返回的值 storages[key] = yield from averager() # 客户端代码 def client(): process_data = { 'boys_2': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3], 'boys_1': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46] } storages = {} for k, v in process_data.items(): # 获得协程 coroutine = grouper(storages, k) # 预激协程 next(coroutine) # 发送数据到协程 for dt in v: coroutine.send(dt) # 终止协程 coroutine.send(None) print(storages) # run client()
{‘boys_2‘: Res(count=9, average=40.422222222222224), ‘boys_1‘: Res(count=9, average=1.3888888888888888)}
解释:
client()
函数开始,for k, v 循环内,每次创建一个新的grouper实例coroutinenext(coroutine)
预激协程,进入while True循环,调用averager()
,yield from处暂停- 内层
for dt in v
结束后,grouper实例仍暂停,所以storages[key]的赋值还未完成 coroutine.send(None)
后,term变为None,averager子生成器中止,抛出StopIteration,并将返回的数据包含在异常对象的value中,yield from 直接抓取 StopItration ,将异常对象的 value 赋值给 storages[key]
asyncio
步骤:创建消息循环(解决异步IO,有中转:相当于信箱,消息queue)-> 导入协程-> 关闭
import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) print('Starting......(%s)' % threading.currentThread()) yield from asyncio.sleep(3) print('Done......(%s)' % threading.currentThread()) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
async & await
更简洁,不用装饰器
import threading import asyncio async def hello(): print('Hello world! (%s)' % threading.currentThread()) print('Starting......(%s)' % threading.currentThread()) await asyncio.sleep(3) print('Done......(%s)' % threading.currentThread()) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
aiohttp
介绍:
- 用asyncio和coroutine配合——http是io操作
例:
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello) srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv loop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()
注:查+理解
concurrent.futures
类似线程池
用multiprocessing实现真正并行计算——运行多个解释器
concurrent.furtures.Executor
- ThreadPoolExecutor
- ProcessPoolExecutor
例子:
from concurrent.futures import ThreadPoolExecutor import time def return_future(msg): time.sleep(3) return msg # 创建一个线程池 pool = ThreadPoolExecutor(max_workers=2) # 往线程池加入2个task f1 = pool.submit(return_future, 'hello') f2 = pool.submit(return_future, 'world') print(f1.done()) time.sleep(3) print(f2.done()) print(f1.result()) print(f2.result())
map(fn, *iterables, timeout=None):
map和submit用一个就行
import time import re import os import datetime from concurrent import futures data = ['1', '2'] def wait_on(argument): print(argument) time.sleep(2) return "ok" ex = futures.ThreadPoolExecutor(max_workers=2) for i in ex.map(wait_on, data): print(i)
Future
- future实例由
Executor.submit
创建
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ] def task(url, timeout=10): return requests.get(url, timeout=timeout) with Pool(max_workers=3) as executor: future_tasks = [executor.submit(task, url) for url in URLS] for f in future_tasks: if f.running(): print('%s is running' % str(f)) for f in as_completed(future_tasks): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cancel() print(str(e))
- future实例由
以上是关于Python 协程的主要内容,如果未能解决你的问题,请参考以下文章