龙卷风发生器在列表中的任何未来恢复

Posted

技术标签:

【中文标题】龙卷风发生器在列表中的任何未来恢复【英文标题】:Tornado generator resume on any future in list 【发布时间】:2014-03-08 12:44:28 【问题描述】:

龙卷风(或异步)中是否存在等待 any 而不是 list 中的所有 Futures 的行为/模式?

yield any_of([future1, future2, future3])

future2 准备好了,那么结果应该是:

[None, <result>, None]

【问题讨论】:

你可以继承tornado.gen.Multi,特别是get_result。这就是 Tornado 将您的未来清单传递到“幕后”的原因。 【参考方案1】:

更新:Tornado 现在有了tornado.gen.WaitIterator,请按照其文档中的示例使用它,而不是我下面的想法。

您可以创建一个继承自 Future 的 Any 类,并包装一个期货列表。 Any 类等到它的一个期货解决后,然后给你结果列表:

import time
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.concurrent import Future


@gen.coroutine
def delayed_msg(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise gen.Return(msg)


class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        self.futures = futures
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        try:
            self.set_result(self.make_result())
        except Exception as e:
            self.set_exception(e)

    def make_result(self):
        """A list of results: None for each pending future, a result for
        each resolved future. Raises an exception for the first future
        that has an exception.
        """
        return [f.result() if f.done() else None
                for f in self.futures]

    def clear(self):
        """Break reference cycle with any pending futures."""
        self.futures = None


@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')
    results = yield Any([future1, future2, future3])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

    results = yield Any([future1, future2])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

IOLoop.current().run_sync(f)

正如预期的那样,这会打印:

finished in 1.0 sec: [None, None, '1']
finished in 2.0 sec: ['2', None]

但是您可以看到这种方法存在一些复杂性。一方面,如果您想在第一个解决之后等待期货的 rest,那么构建仍待处理的期货列表会很复杂。我想你可以这样做:

results = yield Any(f for f in [future1, future2, future3] if not f.done())

不漂亮,甚至不正确!有一个比赛条件。如果未来在yield Any(...) 的连续执行之间 得到解决,那么您将永远不会收到它的结果。第一个yield 没有得到future 的结果,因为它仍然处于未决状态,但是第二个yield 也没有得到它的结果,因为到那时future 已经“完成”并且它不包含在列表传递给Any

另一个复杂之处是 Any 指的是每个未来,它指的是一个回调,该回调指回 Any。为了及时进行垃圾收集,您应该调用 Any.clear()。

此外,您无法区分待处理的 future 和解析为 None 的 future。您需要一个不同于 None 的特殊标记值来表示未决的未来。

最后的并发症是最糟糕的。如果解决了多个期货并且其中一些有例外,那么 Any 没有明显的方法可以将所有这些信息传达给您。将异常和结果混合在一个列表中是不恰当的。

我认为有一个更简单的方法。我们可以让 Any 返回第一个解析的未来,而不是结果列表:

class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        self.set_result(future)

引用循环消失了,异常处理问题得到了解答:Any 类将整个未来返回给您,而不是其结果或异常。您可以随意检查它。在一些解决后等待剩余的期货也很容易:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        print "finished in %.1f sec: %r" % (end - start, resolved.result())
        futures.remove(resolved)

根据需要,打印:

finished in 1.0 sec: '1'
finished in 2.0 sec: '2'
finished in 3.0 sec: '3'

我们可以通过添加一个新的全局函数来测试异常处理行为:

@gen.coroutine
def delayed_exc(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise Exception(msg)

并产生它而不是 delay_msg:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_exc(3, '3')  # Exception!
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        try:
            outcome = resolved.result()
        except Exception as e:
            outcome = e

        print "finished in %.1f sec: %r" % (end - start, outcome)
        futures.remove(resolved)

现在,脚本将打印“1”,然后是“2”,然后是“Exception('3',)”。

【讨论】:

感谢您的深入回答!我实际上是在将它与您的电机库一起使用 :) 另外,您如何看待 tornado.gen.Multi 的子类化(如评论中所建议的)与 Furture 的子类化? 我看了一眼 Multi,子类 Multi 似乎比编写我展示的非常简单的 Any 类(第二个版本)更复杂。我可能是错的,但我没有看到 Multi 方法的优势。 我认为 done_callback 应该检查未来是否已经设置,因为文档说多次调用 set_result 是未定义的。

以上是关于龙卷风发生器在列表中的任何未来恢复的主要内容,如果未能解决你的问题,请参考以下文章

高中必修3Module 3 Period 1 Reading— What Is a Tornado?翻译!急!急!!急!!!

python 龙卷风使未来

龙卷风框架。 TypeError:“未来”对象不可调用

带有 AssertionError 的龙卷风 websocket 崩溃

大数据之高级分析如何从天气中获取洞察力

顶级域名中的数字?