Python asyncio 协议竞争条件

Posted

技术标签:

【中文标题】Python asyncio 协议竞争条件【英文标题】:Python asyncio protocol race conditions 【发布时间】:2016-02-24 19:17:42 【问题描述】:

考虑以下代码(解释如下):

import asyncio

class MyClass(object):
    def __init__(self):
        self.a = 0
    def incr(self, data):
        self.a += 1
        print(self.a)

class GenProtocol(asyncio.SubprocessProtocol):
    def __init__(self, handler, exit_future):
        self.exit_future = exit_future
        self.handler = handler

    def pipe_data_received(self, fd, data):
        if fd == 1:
            self.handler(data)
        else:
            print('An error occurred')

    def process_exited(self):
        self.exit_future.set_result(True)

def start_proc(stdout_handler, *command):
    loop = asyncio.get_event_loop()
    exit_f = asyncio.Future(loop=loop)
    subpr = loop.subprocess_exec(lambda: GenProtocol(stdout_handler, exit_f),
                                 *command,
                                 stdin=None)
    transport, protocol = loop.run_until_complete(subpr)

    @asyncio.coroutine
    def waiter(exit_future):
        yield from exit_future

    return waiter, exit_f

def main():
    my_instance = MyClass()
    loop = asyncio.get_event_loop()
    waiter, exit_f = start_proc(my_instance.incr, 'bash', 'myscript.sh')
    loop.run_until_complete(waiter(exit_f))
    loop.close()

if __name__ == '__main__':
    main()

组件的简要说明如下:

    MyClass 很直接 GenProtocol 是一个允许为在子进程的标准输出上接收到的数据指定自定义处理程序的类。 start_proc 允许您启动一个自定义进程,为通过GenProtocol 在标准输出上接收到的数据指定一个自定义处理程序 my_proc 是一个永远运行的进程,在任意时间向管道发送数据

现在我的问题如下:由于我使用方法作为处理程序,并且由于该方法以非原子方式更改实例属性,这是否有潜在危险?例如,当我在子进程的管道上异步接收数据时,处理程序是同时调用两次(因此有损坏 MyClass.a 中的数据的风​​险)还是序列化(即第二次调用处理程序时它不会执行,直到第一个完成)?

【问题讨论】:

您可以使用同步原语(例如锁)docs.python.org/3/library/asyncio-sync.html 感谢指点!所以答案是,是的,可能存在竞争条件,对吧? 请参阅这篇关于 Python 中的多线程、异步编程和局部推理的精彩文章:Unyielding。 【参考方案1】:

协议方法是常规函数,而不是协程。 它们内部没有屈服点

所以执行顺序非常简单:所有调用都是序列化的,不可能出现竞争条件。

UPD

在示例中,pipe_data_received() 不是协程,而只是一个没有 await / yield from 内部的函数。

asyncio 总是一次全部执行,中间没有任何上下文切换。

您可能认为pipe_data_received() 是受锁保护的,但实际上这种情况不需要任何锁。

当你有这样的协程时,锁是强制性的:

async def incr(self):
    await asyncio.sleep(0.1)
    self.counter +=1

在后者中,incr() 是一个协程,此外,在sleep() 调用上非常可能进行上下文切换。如果你想保护并行递增,你可以使用asyncio.Lock():

def __init__(self):
    self.counter = 0
    self._lock = asyncio.Lock()

async def incr(self):
    async with self._lock:
        await asyncio.sleep(0.1)
        self.counter +=1

【讨论】:

非常感谢您的回答!您能否对此进行一些扩展或提供一些外部参考来更详细地解释其工作原理? 我试着再解释几句。随意再问一遍,答案还是不够清楚。

以上是关于Python asyncio 协议竞争条件的主要内容,如果未能解决你的问题,请参考以下文章

python通过管道进行进程通信:竞争条件

输出 asyncio 子进程调用的命令行?

Python3 asyncio“任务已被破坏,但处于待处理状态”,具有某些特定条件

异步等待子处理?

Asyncio 协议Protocol 与 传输Transport

Python多处理竞争条件