当前位置: 代码迷 >> python >> 龙卷风发电机可在列表中的任何将来恢复
  详细解决方案

龙卷风发电机可在列表中的任何将来恢复

热度:46   发布时间:2023-06-19 09:21:10.0

是否有龙卷风行为/模式(或ASYNCIO)等待所有 ,而不是在所有的期货list

yield any_of([future1, future2, future3])

future2准备好了,那么结果应该是:

[None, <result>, None]

更新 :Tornado现在具有 ,请根据其文档中的示例使用它,而不是下面的想法。

您可以创建一个从Future继承并包装期货列表的Any类。 Any类等待其期货之一解决,然后为您提供结果列表:

import time
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.concurrent import Future


@gen.coroutine
def delayed_msg(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise gen.Return(msg)


class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        self.futures = futures
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        try:
            self.set_result(self.make_result())
        except Exception as e:
            self.set_exception(e)

    def make_result(self):
        """A list of results: None for each pending future, a result for
        each resolved future. Raises an exception for the first future
        that has an exception.
        """
        return [f.result() if f.done() else None
                for f in self.futures]

    def clear(self):
        """Break reference cycle with any pending futures."""
        self.futures = None


@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')
    results = yield Any([future1, future2, future3])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

    results = yield Any([future1, future2])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

IOLoop.current().run_sync(f)

如预期的那样,将打印:

finished in 1.0 sec: [None, None, '1']
finished in 2.0 sec: ['2', None]

但是您会看到这种方法存在一些复杂性。 一方面,如果您想在第一个期货结算后等待其余的期货,那么构造仍在等待中的期货清单会很复杂。 我想你可以做:

results = yield Any(f for f in [future1, future2, future3] if not f.done())

不漂亮,甚至不正确! 有比赛条件。 如果 yield Any(...)连续执行之间解决了future,那么您将永远不会收到结果。 第一个yield不会得到未来的结果,因为它仍在等待中,但是第二个yield也不会得到它的结果,因为到那时为止,未来是“完成”的,并且不包含在传递给Any的列表中。

另一个复杂之处是,Any引用每个Future,这是指回调到Any的回调。 为了及时进行垃圾收集,您应该调用Any.clear()。

此外,您无法区分未决的将来和解析为“无”的将来。 您需要一个不同于None的特殊哨兵值,以表示一个未决的未来。

最后的并发症是最糟糕的。 如果解决了多个期货问题,并且其中的一些例外情况,则Any显然无法将所有信息传达给您。 将异常和结果混合在列表中将是错误的。

我认为有一种更简单的方法。 我们可以使Any return成为第一个可以解决的未来,而不是结果列表:

class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        self.set_result(future)

参考周期已经过去,并且异常处理问题得到了回答:Any类将整个将来返回给您,而不是其结果或异常。 您可以根据需要对其进行检查。 在某些问题解决之后,等待剩余的期货也很容易:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        print "finished in %.1f sec: %r" % (end - start, resolved.result())
        futures.remove(resolved)

根据需要,将打印:

finished in 1.0 sec: '1'
finished in 2.0 sec: '2'
finished in 3.0 sec: '3'

我们可以通过添加新的全局函数来测试异常处理行为:

@gen.coroutine
def delayed_exc(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise Exception(msg)

并产生它而不是delay_msg:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_exc(3, '3')  # Exception!
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        try:
            outcome = resolved.result()
        except Exception as e:
            outcome = e

        print "finished in %.1f sec: %r" % (end - start, outcome)
        futures.remove(resolved)

现在,脚本将打印“ 1”,然后是“ 2”,然后是“ Exception('3',)”。

  相关解决方案