如何在 Python 中的多个异步进程之间进行同步?

Posted

技术标签:

【中文标题】如何在 Python 中的多个异步进程之间进行同步?【英文标题】:How to synchronize between multiple async processes in Python? 【发布时间】:2020-11-18 18:12:59 【问题描述】:

我有一个使用 fastapi 的异步 http web 服务。我在不同端口上的服务器上运行同一服务的多个实例,并且前面有一个 nginx 服务器,因此我可以全部使用它们。我有一个特定的资源,我需要保护它只有一个客户在访问它。

@app.get("/do_something")
async def do_something():
     critical_section_here()

我尝试使用这样的文件锁来保护这个关键部分:

@app.get("/do_something")
async def do_something():
    with FileLock("dosomething.lock"):
        critical_section()

这将防止多个进程同时进入临界区。但我发现这实际上会死锁。考虑以下事件:

    客户端1连接到8000端口并进入临界区 当客户端 1 仍在使用资源时,客户端 2 被路由到同一个端口 8000,然后它会尝试获取文件锁,它不能,所以它会继续尝试,这将阻止客户端 1 和客户端的执行1 将永远无法释放文件锁,这意味着不仅此进程被锁定,所有其他服务器实例也将被锁定。

有没有办法让我协调这些进程,使它们中只有一个访问关键部分?我想过给文件锁加一个超时,但我真的不想拒绝用户,我只想等到轮到他/她进入临界区。

【问题讨论】:

【参考方案1】:

你可以试试这样的:

import fcntl

from contextlib import asynccontextmanager

from fastapi import FastAPI

app = FastAPI()


def acquire_lock():
    f = open("/tmp/test.lock", "w")
    fcntl.flock(f, fcntl.LOCK_EX)
    return f


@asynccontextmanager
async def lock():
    loop = asyncio.get_running_loop()
    f = await loop.run_in_executor(None, acquire_lock)
    try:
        yield
    finally:
        f.close()


@app.get("/test/")
async def test():
    async with lock():
        print("Enter critical section")
        await asyncio.sleep(5)
        print("End critical section")

它基本上会序列化你的所有请求。

【讨论】:

遗憾的是,这是唯一的方法。遗憾的是它必须涉及线程,但是没有线程或忙循环根本无法等待flock()。 (非阻塞flock()只能用于测试;不能将锁与文件描述符相关联,以便将其与事件循环集成。)【参考方案2】:

您可以通过使用多处理queue 来避免该问题。每次新客户端连接时,您都会将请求(可以表示为新对象)添加到队列中。然后,您创建一个服务器进程,一次处理一个请求。 Here你可以找到这样一个场景的简单实现。

【讨论】:

OP 在使用多处理队列时会遇到完全相同的问题,因为在这样的队列上等待是一个阻塞操作,会导致死锁,如问题中所述。此外,multiprocessing.Queue 在由一个多处理实例启动的进程之间工作,而不是在完全不相关的 Python 进程之间工作,我理解 OP 正在拥有这些进程。【参考方案3】:

你可以使用aioredlock。

它允许您在工作人员(进程)之间创建分布式锁。有关其用法的更多信息,请点击上面的链接。

redlock 算法是 Redis 的分布式锁实现。它有多种语言的许多实现。在这种情况下,这是 python 3.5+ 的 asyncio 兼容实现。

使用示例:

# Or you can use the lock as async context manager:
try:
    async with await lock_manager.lock("resource_name") as lock:
        assert lock.valid is True
        # Do your stuff having the lock
        await lock.extend()  # alias for lock_manager.extend(lock)
        # Do more stuff having the lock
    assert lock.valid is False # lock will be released by context manager
except LockError:
    print('Lock not acquired')
    raise

【讨论】:

以上是关于如何在 Python 中的多个异步进程之间进行同步?的主要内容,如果未能解决你的问题,请参考以下文章

111 python程序中的进程操作-多进程同步(mulitProcessing Lock锁)

论并行,并发,同步,异步之间的联系与区别

GIL 线程池 进程池 同步 异步

30分钟读懂进程线程同步异步阻塞非阻塞并发并行

为你揭秘 Python 中的进程线程协程同步异步回调

python-并发并行同步异步同步锁