等效于带有工作“线程”的 asyncio.Queues

Posted

技术标签:

【中文标题】等效于带有工作“线程”的 asyncio.Queues【英文标题】:Equivalent of asyncio.Queues with worker "threads" 【发布时间】:2014-05-26 06:53:37 【问题描述】:

我试图弄清楚如何移植线程程序以使用asyncio。我有很多代码同步几个标准库Queues,基本上是这样的:

import queue, random, threading, time

q = queue.Queue()

def produce():
    while True:
        time.sleep(0.5 + random.random())  # sleep for .5 - 1.5 seconds
        q.put(random.random())

def consume():
    while True: 
        value = q.get(block=True)
        print("Consumed", value)

threading.Thread(target=produce).start()
threading.Thread(target=consume).start()

一个线程创建值(可能是用户输入),另一个线程对它们进行处理。关键是这些线程在有新数据之前处于空闲状态,此时它们会醒来并对其进行处理。

我正在尝试使用 asyncio 来实现这种模式,但我似乎无法弄清楚如何让它“运行”。

我的尝试或多或少看起来像这样(并且根本不做任何事情)。

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)

# do something here to start the coroutines. asyncio.Task()? 

loop = asyncio.get_event_loop()
loop.run_forever()

我尝试过使用协程的变体,不使用协程,在任务中包装东西,试图让它们创建或返回期货等。

我开始认为我对应该如何使用 asyncio 有错误的想法(也许这种模式应该以我不知道的不同方式实现)。 任何指针将不胜感激。

【问题讨论】:

你为什么要放弃线程以支持 asyncio? @dmmd - 不离开,我一直在使用线程。然而,Asyncio 对于某些类型的问题很方便,特别是当它们涉及大量阻塞 I/O 时。在没有无限资源的系统(树莓派,“云”机器)上,有时 asyncio 可以用更少的努力完成同样的事情。 很高兴知道,谢谢。 【参考方案1】:

是的,没错。任务是你的朋友:

import asyncio, random

q = asyncio.Queue()

@asyncio.coroutine
def produce():
    while True:
        yield from q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())

@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("Consumed", value)


loop = asyncio.get_event_loop()
loop.create_task(produce())
loop.create_task(consume())
loop.run_forever()

asyncio.ensure_future也可以用于创建任务。

请记住:q.put() 是一个协程,所以你应该使用yield from q.put(value)

UPD

例如从asyncio.Task()/asyncio.async()切换到新品牌APIloop.create_task()asyncio.ensure_future()

【讨论】:

我很好奇“while True”循环是否适合用于生产代码?还有推荐的测试方法吗? @user772401 通常取决于生产者。生成随机序列while True 是一个很好的例子,从套接字读取——也许,你的代码无论如何都应该等待来自套接字的一部分新数据。对于测试 my 示例,我建议将随机种子固定为恒定值并分析消耗的结果。更复杂的场景需要另一种技术。 但在生产者/消费者模式的情况下,在“while True”循环上运行的任务可能占我程序(也称为事件循环)的 50%。因此(在我正在玩的当前项目中)如果我的生产者或消费者任务发生故障,它会使事件循环与运行无关。我想我想要的是一种类似于supervisord类型的魔法,它说'如果这个任务出错,重新安排它'......我觉得确保消费者任务永远不会因错误输入而中断的唯一方法(输入你无法控制) 是将其关键代码包装在 try/except 不错的答案。我只有一个小怪癖。引用文档:"Don’t directly create Task instances: use the ensure_future() function or the BaseEventLoop.create_task() method." @FrederikAalund 无论如何都更新了答案【参考方案2】:

这是我在生产中使用的,转移到要点:https://gist.github.com/thehesiod/7081ab165b9a0d4de2e07d321cc2391d

【讨论】:

谢谢,你帮了我很多!我需要一个带有添加和处理项目选项的队列。但是,在我的情况下,我添加了以下内容:async def finish(self): await self._queue.join() await self.join() 然后使用这样的 AsyncWorkerPool 实例:loop.run_until_complete(pool.finish()) 在没有要处理的内容时完成。【参考方案3】:

稍后,也许是 OT,请记住,您可以从多个任务中消费 Queue,因为它们是独立的消费者。

以下 sn-p 示例显示了如何使用 asyncio 任务实现相同的线程池模式。

q = asyncio.Queue()

async def sum(x):
    await asyncio.sleep(0.1)  # simulates asynchronously
    return x

async def consumer(i):
    print("Consumer  started".format(i))
    while True:
        f, x = await q.get()
        print("Consumer  procesing ".format(i, x))
        r = await sum(x)
        f.set_result(r)

async def producer():
    consumers = [asyncio.ensure_future(consumer(i)) for i in range(5)]
    loop = asyncio.get_event_loop()
    tasks = [(asyncio.Future(), x) for x in range(10)]
    for task in tasks:
        await q.put(task)

    # wait until all futures are completed
    results = await asyncio.gather(*[f for f, _ in tasks])
    assert results == [r for _, r in tasks]

    # destroy tasks
    for c in consumers:
        c.cancel()


asyncio.get_event_loop().run_until_complete(producer())

【讨论】:

以上是关于等效于带有工作“线程”的 asyncio.Queues的主要内容,如果未能解决你的问题,请参考以下文章

Xcode 故事板等效于带有 View.GONE 的 Android LinearLayout

等效于 CUDA 中的 async_work_group_copy

Excel 等效于 Python“Not In”,以返回去除了不需要的字符的字符串

iOS/Objective-C 等效于 Android 的 AsyncTask

Windows 挂钩的 Linux 等效项

Python 等效于 Perl 的 HTTP::Async->next_response