Python asyncio 跳过处理直到函数返回

Posted

技术标签:

【中文标题】Python asyncio 跳过处理直到函数返回【英文标题】:Python asyncio skip processing untill function return 【发布时间】:2019-01-25 17:53:12 【问题描述】:

我仍然对 asyncio 的工作原理感到非常困惑,所以我试图设置一个简单的示例但无法实现。

以下示例是一个 Web 服务器 (Quart),它接收到生成大 PDF 的请求,然后服务器在开始处理 PDF 之前返回响应,然后开始处理它并稍后将下载链接发送到电子邮件。

from quart import Quart
import asyncio
import time

app = Quart(__name__)

@app.route('/')
async def pdf():
    t1 = time.time()
    await generatePdf()
    return 'Time to execute :  seconds'.format(time.time() - t1)

async def generatePdf():
    await asyncio.sleep(5)
    #sync generatepdf
    #send pdf link to email

app.run()

我该怎么做呢?在上面的示例中,我不希望在返回之前等待 5 秒。

我什至不确定 asyncio 是否是我需要的。

而且恐怕在响应返回后阻止服务器应用程序不是应该做的事情,但也不确定。

pdf 库也是同步的,但我想那是另一天的问题......

【问题讨论】:

尝试将await generatePdf() 更改为asyncio.create_task(generatePdf()) 【参考方案1】:

评论包含响应网络请求和安排稍后生成 pdf 所需的一切。

asyncio.create_task(generatePdf())

但是,如果 pdf 处理速度很慢,则不是一个好主意,因为它会阻塞 asyncio 事件线程。即当前请求将很快得到响应,但后续请求必须等到 pdf 生成完成。

正确的方法是在执行器中运行任务(尤其是ProcessPoolExecutor)。

from quart import Quart
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

app = Quart(__name__)
executor = ProcessPoolExecutor(max_workers=5)

@app.route('/')
async def pdf():
    t1 = time.time()
    asyncio.get_running_loop().run_in_executor(executor, generatePdf)
    # await generatePdf()
    return 'Time to execute :  seconds'.format(time.time() - t1)

def generatePdf():
    #sync generatepdf
    #send pdf link to email

app.run()

需要注意的是,由于它在不同的进程中运行,generatePdf 在没有同步的情况下无法访问任何数据。因此,在调用函数时传递函数所需的所有内容。


更新

如果您可以重构 generatePdf 函数并使其异步,则效果最佳。

如果生成的 pdf 看起来像这样的示例

def generatePdf():
    image1 = downloadImage(image1Url)
    image2 = downloadImage(image2Url)
    data = queryData()
    pdfFile = makePdf(image1, image2, data)
    link = upLoadToS3(pdfFile)
    sendEmail(link)

您可以像这样使函数异步:

async def generatePdf():
    image1, image2, data = await asyncio.gather(downloadImage(image1Url), downloadImage(image2Url), queryData())
    pdfFile = makePdf(image1, image2, data)
    link = await upLoadToS3(pdfFile)
    await sendEmail(link) 

注意:downloadImagequeryData 等所有辅助函数都需要重写以支持async。这样,即使数据库或图像服务器很慢,请求也不会被阻塞。一切都在同一个异步线程中运行。

如果其中一些还不是异步的,那么它们可以与run_in_executor 一起使用,并且应该可以很好地与其他异步函数一起使用。

【讨论】:

所以说到底,一个线程是最好的解决方案?这是否意味着甚至不需要 asyncio ?这是我从烧瓶换到夸脱的唯一原因 @Mojimi asyncio 无法加速同步阻塞任务。比如pdf生成需要做多个数据库查询,而final generation又不是太慢,可以使用ayncio。 同意! asyncio 仅对 I/O 绑定任务或其他非 CPU 绑定任务有用。对 CPU 密集型任务使用 asyncio 不会实现任何并发性。正如@balki 指出的那样,当您将 asyncio 用于 I/O 绑定任务时,您必须确保 CPU 绑定任务尽快返回。在这种情况下,我同意将生成 pdf 的 CPU 繁重任务委托给单独的线程或进程 @balki 我在实现这个时遇到了一个问题,run_in_executor 需要等待,并且通过向它添加等待它再次成为阻塞,我错过了什么吗?只是打电话给run_in_executor 什么都没做 @Mojimi 使用asyncio.create_task(...run_in_executor..) 参考:docs.python.org/3/library/asyncio-task.html#asyncio.create_task【参考方案2】:
    我强烈建议您阅读 Brad Solomon 的 explanatory article,了解 Python 中的并行编程和异步。 出于异步执行任务的目的,无需阻塞请求直到任务完成 - 我认为最好的选择是使用带有“PDFGenerator”类的queue,该类从队列模式中消耗(文章中也有covered)

【讨论】:

我现在明白什么是异步 io,是关于跳过服务器请求等等待任务,对吗?所以我认为它不适合处理大型 PDF,因为这是一项 CPU 和内存密集型任务 好吧,实际上我的 PDF 生成包括下载一些图像,我想我可以优化一个循环以异步 io 方式收集这些图像 对于第二个问题,我现在最担心的不是阻止请求,我设法做到了,而是阻止了服务器应用程序。 对于 CPU 密集型任务,您最好使用多处理,由于全局解释器锁 (GIL),使用协程或线程将无济于事。但是,如果您担心阻塞服务器,您可能应该分析应用程序以更好地了解吞吐量和计算时间【参考方案3】:

对于生成大型 PDF 的任务,您可以使用异步任务/作业队列。例如,您可以使用Celery。由于您不想等待任务,而是返回类似“正在生成 PDF,请稍等/秒”的回复。因此,当请求到达“生成 PDF”端点时,您将在 Celery 中创建一个任务,Celery 将异步处理它,完成后,您可以推送到客户端或客户端可以使用任务 ID 使用“任务查找”(或在您实施时)。这是一个示例答案 - How to check task status in Celery?

Celery 和 Asyncio 的区别在于,Celery 可以在完全分离的环境中执行任务,并且与服务器的通信是通过像RabbitMQ 这样的分布式消息传递来完成的。 Asyncio 使用协程来利用阻塞 I/O 时间。它将使用您的服务器所在的相同环境和处理器。

【讨论】:

它与使用 ProcessPoolExecutor 有何不同? Celery 可以在需要时在单独的环境中使用,并且在控制和通信方面已经足够成熟,例如使用 RabbitMQ 进行消息传递、后端任务保存等。在 ProcessPoolExecutor 中,您必须在扩展或尝试时实现这些将工作人员与服务器分开。在容错的情况下,您的服务器可能会丢失任务,但如果您使用后端进行任务监控,您可以在工作人员/服务器再次在线时找到任务,并且使用 celery 很容易实现。

以上是关于Python asyncio 跳过处理直到函数返回的主要内容,如果未能解决你的问题,请参考以下文章

Python之asyncio模块的使用

asyncio的简单使用,python异步高效处理数据,asyncio.get_event_loop(),loop.run_until_complete(main()),loop.close()

python3.5+ asyncio await异步详解

Python asyncio之协程学习总结

等待在 Python 中使用 asyncio 下载

如何使用 python 的 asyncio 模块正确创建和运行并发任务?