同步和异步实现的代码重复

Posted

技术标签:

【中文标题】同步和异步实现的代码重复【英文标题】:Duplication of code for synchronous and asynchronous implementations 【发布时间】:2019-03-14 00:04:24 【问题描述】:

在实现同时用于同步和异步应用程序的类时,我发现自己为这两个用例维护了几乎相同的代码。

举个例子,考虑一下:

from time import sleep
import asyncio


class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    async def a_ticker(self, to):
        for i in range(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        for i in range(to):
            yield i
            sleep(self.delay)


def func(ue):
    for value in ue.ticker(5):
        print(value)


async def a_func(ue):
    async for value in ue.a_ticker(5):
        print(value)


def main():
    ue = UselessExample(1)
    func(ue)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(a_func(ue))


if __name__ == '__main__':
    main()

在这个例子中,还不错,UselessExampleticker 方法很容易串联维护,但是你可以想象异常处理和更复杂的功能可以快速增长一个方法,使其更像一个问题,即使这两种方法几乎可以保持相同(仅用它们的异步对应物替换某些元素)。

假设没有实质性的区别值得完全实现,那么维护这样的类并避免不必要的重复的最佳(也是最 Pythonic)的方法是什么?

【问题讨论】:

【参考方案1】:

要使基于异步协程的代码库可以从传统的同步代码库中使用,没有一种万能的方法。您必须根据代码路径做出选择。

从一系列工具中挑选:

使用asyncio.run()的同步版本

为协程提供同步包装器,它会一直阻塞直到协程完成。

即使是像ticker() 这样的异步生成器函数也可以这样处理,在一个循环中:

class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    async def a_ticker(self, to):
        for i in range(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        agen = self.a_ticker(to)
        try:
            while True:
                yield asyncio.run(agen.__anext__())
        except StopAsyncIteration:
            return

可以使用辅助函数生成这些同步包装器:

from functools import wraps

def sync_agen_method(agen_method):
    @wraps(agen_method)
    def wrapper(self, *args, **kwargs):
        agen = agen_method(self, *args, **kwargs)   
        try:
            while True:
                yield asyncio.run(agen.__anext__())
        except StopAsyncIteration:
            return
    if wrapper.__name__[:2] == 'a_':
        wrapper.__name__ = wrapper.__name__[2:]
    return wrapper
    

然后在类定义中使用ticker = sync_agen_method(a_ticker)

直接的协程方法(不是生成器协程)可以用:

def sync_method(async_method):
    @wraps(async_method)
    def wrapper(self, *args, **kwargs):
        return async.run(async_method(self, *args, **kwargs))
    if wrapper.__name__[:2] == 'a_':
        wrapper.__name__ = wrapper.__name__[2:]
    return wrapper

分解出常见的组件

将同步部分重构为生成器、上下文管理器、实用函数等。

对于您的具体示例,将 for 循环拉出到单独的生成器中可以最大限度地减少重复代码,使其成为两个版本休眠的方式:

class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    def _ticker_gen(self, to):
        yield from range(to)

    async def a_ticker(self, to):
        for i in self._ticker_gen(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        for i in self._ticker_gen(to):
            yield i
            sleep(self.delay)

虽然这在这里并没有太大的区别,但它可以在其他情况下工作。

抽象语法树转换

使用 AST 重写和映射将协程转换为同步代码。如果您不注意如何识别 asyncio.sleep()time.sleep() 等实用函数,这可能会非常脆弱:

import inspect
import ast
import copy
import textwrap
import time

asynciomap = 
    # asyncio function to (additional globals, replacement source) tuples
    "sleep": ("time": time, "time.sleep")



class AsyncToSync(ast.NodeTransformer):
    def __init__(self):
        self.globals = 

    def visit_AsyncFunctionDef(self, node):
        return ast.copy_location(
            ast.FunctionDef(
                node.name,
                self.visit(node.args),
                [self.visit(stmt) for stmt in node.body],
                [self.visit(stmt) for stmt in node.decorator_list],
                node.returns and ast.visit(node.returns),
            ),
            node,
        )

    def visit_Await(self, node):
        return self.visit(node.value)

    def visit_Attribute(self, node):
        if (
            isinstance(node.value, ast.Name)
            and isinstance(node.value.ctx, ast.Load)
            and node.value.id == "asyncio"
            and node.attr in asynciomap
        ):
            g, replacement = asynciomap[node.attr]
            self.globals.update(g)
            return ast.copy_location(
                ast.parse(replacement, mode="eval").body,
                node
            )
        return node


def transform_sync(f):
    filename = inspect.getfile(f)
    lines, lineno = inspect.getsourcelines(f)
    ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
    ast.increment_lineno(ast_tree, lineno - 1)

    transformer = AsyncToSync()
    transformer.visit(ast_tree)
    tranformed_globals = **f.__globals__, **transformer.globals
    exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
    return tranformed_globals[f.__name__]

虽然以上内容可能远不足以满足所有需求,并且转换 AST 树可能令人生畏,但以上内容可以让您只维护异步版本并将该版本直接映射到同步版本:

>>> import example
>>> del example.UselessExample.ticker
>>> example.main()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../example.py", line 32, in main
    func(ue)
  File "/.../example.py", line 21, in func
    for value in ue.ticker(5):
AttributeError: 'UselessExample' object has no attribute 'ticker'
>>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
>>> example.main()
0
1
2
3
4
0
1
2
3
4

【讨论】:

第一个解决方案在从异步代码调用同步方法的情况下不起作用,因为如果另一个事件循环已经在运行,async.run 将失败。如果您想支持在 Jupyter notebook 中使用,这一点非常重要(因为内核中一直在运行一个后台循环)。 谢谢-那里没有神奇的修复,但我也没有真正期待它;我认为你以一种有意义的方式解决了这个问题,它有一些我将使用的有用建议。希望其他为此苦苦挣扎的人也是如此。 @gdlmx:是的,我已经更新了答案以使用辅助函数,该函数回退到使用现有循环。 @MartijnPieters 您的原始包装没有任何问题。如果事件循环已经在运行,run_until_complete 将失败并出现与run 相同的错误。实际上,不可能将包装器写入await 同步函数内的协程、未来或任务。虽然submit the coroutine to the existing event loop 是可能的,但是协程只会在同步函数返回后 被调用。反正只有一个线程在运行。 @gdlmx: 啊,是的,你说的很对。使用run_until_complete() 是一个愚蠢的想法,因为它也会在完成时停止循环。在这种情况下可能需要使用新线程。【参考方案2】:

async/await 在设计上具有传染性。

接受您的代码将有不同的用户 - 同步和异步,并且这些用户会有不同的要求,随着时间的推移,实现会出现分歧。

发布单独的库

例如,比较 aiohttpaiohttp-requestsrequests

同样,比较 asyncpgpsycopg2

如何到达

选项 1。 (简单)克隆实现,允许它们发散。

选项 2。 (明智的)部分重构,例如异步库依赖并导入同步库。

选项 3。 (激进)创建一个可以在同步和异步程序中使用的“纯”库。例如,请参阅https://github.com/python-hyper/hyper-h2。

从好的方面来说,测试更容易、更彻底。考虑强制测试框架评估异步程序中所有可能的并发执行顺序有多难(或不可能)。纯库不需要那个:)

不利的一面是,这种编程风格需要不同的思维方式,并不总是直截了当,而且可能不是最理想的。例如,您可以编写 for event in fsm.push(data): ... 而不是 await socket.read(2**20),并依靠您的库用户以大块的形式为您提供数据。

有关上下文,请参阅https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/ 中的backpressure 参数

【讨论】:

我不同意这个原则,但它并没有改变这样一个事实,即这些库最终可能具有极其相似的代码并且必须并排维护。问题是限制此类库之间的复制量的最佳做法是什么 - 无论是全部在一个文件中,还是拆分为单独的库(这不是一个坏建议)。

以上是关于同步和异步实现的代码重复的主要内容,如果未能解决你的问题,请参考以下文章

为啥我不能将异步代码作为同步运行 [重复]

C#有异步函数调用同步函数或同步函数调用异步函数

同步加载和异步加载的原生态实现

javascript同步和异步的区别与实现方式

异步和同步术语

异步和同步的区别?