等效于带有工作“线程”的 asyncio.Queues
Posted
技术标签:
【中文标题】等效于带有工作“线程”的 asyncio.Queues【英文标题】:Equivalent of asyncio.Queues with worker "threads" 【发布时间】:2014-05-26 06:53:37 【问题描述】:我试图弄清楚如何移植线程程序以使用asyncio
。我有很多代码同步几个标准库Queues
,基本上是这样的:
import queue, random, threading, time
q = queue.Queue()
def produce():
while True:
time.sleep(0.5 + random.random()) # sleep for .5 - 1.5 seconds
q.put(random.random())
def consume():
while True:
value = q.get(block=True)
print("Consumed", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()
一个线程创建值(可能是用户输入),另一个线程对它们进行处理。关键是这些线程在有新数据之前处于空闲状态,此时它们会醒来并对其进行处理。
我正在尝试使用 asyncio 来实现这种模式,但我似乎无法弄清楚如何让它“运行”。
我的尝试或多或少看起来像这样(并且根本不做任何事情)。
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
# do something here to start the coroutines. asyncio.Task()?
loop = asyncio.get_event_loop()
loop.run_forever()
我尝试过使用协程的变体,不使用协程,在任务中包装东西,试图让它们创建或返回期货等。
我开始认为我对应该如何使用 asyncio 有错误的想法(也许这种模式应该以我不知道的不同方式实现)。 任何指针将不胜感激。
【问题讨论】:
你为什么要放弃线程以支持 asyncio? @dmmd - 不离开,我一直在使用线程。然而,Asyncio 对于某些类型的问题很方便,特别是当它们涉及大量阻塞 I/O 时。在没有无限资源的系统(树莓派,“云”机器)上,有时 asyncio 可以用更少的努力完成同样的事情。 很高兴知道,谢谢。 【参考方案1】:是的,没错。任务是你的朋友:
import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
while True:
yield from q.put(random.random())
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
print("Consumed", value)
loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()
asyncio.ensure_future
也可以用于创建任务。
请记住:q.put()
是一个协程,所以你应该使用yield from q.put(value)
。
UPD
例如从asyncio.Task()
/asyncio.async()
切换到新品牌APIloop.create_task()
和asyncio.ensure_future()
。
【讨论】:
我很好奇“while True”循环是否适合用于生产代码?还有推荐的测试方法吗? @user772401 通常取决于生产者。生成随机序列while True
是一个很好的例子,从套接字读取——也许,你的代码无论如何都应该等待来自套接字的一部分新数据。对于测试 my 示例,我建议将随机种子固定为恒定值并分析消耗的结果。更复杂的场景需要另一种技术。
但在生产者/消费者模式的情况下,在“while True”循环上运行的任务可能占我程序(也称为事件循环)的 50%。因此(在我正在玩的当前项目中)如果我的生产者或消费者任务发生故障,它会使事件循环与运行无关。我想我想要的是一种类似于supervisord类型的魔法,它说'如果这个任务出错,重新安排它'......我觉得确保消费者任务永远不会因错误输入而中断的唯一方法(输入你无法控制) 是将其关键代码包装在 try/except
不错的答案。我只有一个小怪癖。引用文档:"Don’t directly create Task
instances: use the ensure_future()
function or the BaseEventLoop.create_task()
method."
@FrederikAalund 无论如何都更新了答案【参考方案2】:
这是我在生产中使用的,转移到要点:https://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d
【讨论】:
谢谢,你帮了我很多!我需要一个带有添加和处理项目选项的队列。但是,在我的情况下,我添加了以下内容:async def finish(self): await self._queue.join() await self.join()
然后使用这样的 AsyncWorkerPool 实例:loop.run_until_complete(pool.finish())
在没有要处理的内容时完成。【参考方案3】:
稍后,也许是 OT,请记住,您可以从多个任务中消费 Queue
,因为它们是独立的消费者。
以下 sn-p 示例显示了如何使用 asyncio
任务实现相同的线程池模式。
q = asyncio.Queue()
async def sum(x):
await asyncio.sleep(0.1) # simulates asynchronously
return x
async def consumer(i):
print("Consumer started".format(i))
while True:
f, x = await q.get()
print("Consumer procesing ".format(i, x))
r = await sum(x)
f.set_result(r)
async def producer():
consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]
loop = asyncio.get_event_loop()
tasks = [(asyncio.Future(), x) for x in range(10)]
for task in tasks:
await q.put(task)
# wait until all futures are completed
results = await asyncio.gather(*[f for f, _ in tasks])
assert results == [r for _, r in tasks]
# destroy tasks
for c in consumers:
c.cancel()
asyncio.get_event_loop().run_until_complete(producer())
【讨论】:
以上是关于等效于带有工作“线程”的 asyncio.Queues的主要内容,如果未能解决你的问题,请参考以下文章
Xcode 故事板等效于带有 View.GONE 的 Android LinearLayout
等效于 CUDA 中的 async_work_group_copy
Excel 等效于 Python“Not In”,以返回去除了不需要的字符的字符串