如何在异步函数中并行化 for 循环并跟踪 for 循环执行状态?

Posted

技术标签:

【中文标题】如何在异步函数中并行化 for 循环并跟踪 for 循环执行状态?【英文标题】:How to parallelize the for loop inside a async function and track for loop execution status? 【发布时间】:2020-12-03 18:50:13 【问题描述】:

最近,我问了一个关于如何在部署的 API 中跟踪 for 循环的进度的问题。这是link。

对我有用的solution code 是,

from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid


context = 'jobs': 

app = FastAPI()


async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    jobs[job_key]['status'] = 'done'


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = 
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return "identifier": identifier


@app.get('/status/identifier')
async def status(identifier):
    return 
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    

这样,我可以通过调用status method 使用标识符跟踪do_work 内for 循环的进度

现在,我正在寻找一种方法来并行化 do_work 方法中的 for 循环。

但是如果我使用joblib那么我不知道如何跟踪正在处理的每个文件,迭代计数将毫无意义,因为所有文件都将并行处理。

注意:我只是举了一个joblib的例子,因为我对其他库不是很熟悉。对文件的处理是基于 CPU 的繁重工作。我正在预处理文件并加载 4 个 tensorflow 模型并在文件上进行预测并写入 sql 数据库。

如果有人知道我可以做到的任何方法,请分享并帮助我。

【问题讨论】:

是否有理由需要使用 joblib 而不是 asyncio.run_coroutine_threadsafe 运行它?有了这样的功能,您可以共享变量,在我看来,这可能是一个伟大而简单的想法 不,我只是举了一个joblib 的例子,因为我熟悉那个库,我正在寻找的只是让我的 for 循环执行并行,而不会丢失继续跟踪哪个迭代的功能继续,所以我也跟踪进度 根据joblib 的文档,看起来你可以通过共享变量来实现。这可能会导致竞争条件,但如果该函数是唯一正在处理 id 并且 ids 保证是 uniuqe 的人,则情况不应该如此。见joblib.readthedocs.io/en/latest/auto_examples/… 您能否与我上面给出的代码分享一个工作示例,说明您将如何做到这一点? id 是什么意思?你是说状态标识符吗? 【参考方案1】:

我不是 100% 确定我理解了,这样的方法有用吗?

async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    jobs = context['jobs']
    job_info = jobs[job_key]
    job_info['iteration'] = 0

    async def do_work_inner(file):
        # do the work on the file here
        job_info['iteration'] += 1
        await asyncio.sleep(0.5)

    tasks = [do_work_inner(file) for file in iter_over]
    job_info['status'] = 'inprogress'
    await asyncio.gather(*tasks)
    jobs[job_key]['status'] = 'done'

这将并行运行文件上的所有工作*,请记住,在这种情况下,job_info['iteration'] 几乎没有意义,因为它们都是一起开始的,它们会一起增加值。

这是异步并行,这意味着它不是并行的,但事件循环会不断地从一个任务跳到另一个任务。

请注意,如果它是与 cpu 相关的工作(计算、分析等)而不是主要与 IO 相关的工作(如 web 调用),那么您想要对文件执行的实际工作是什么,这非常重要,那么这是错误的解决方案,应该稍微调整一下,如果有,请告诉我,我会尝试更新它。

编辑:cpu 相关工作的更新版本,进度显示文件已完成

这是一个比较完整的例子,只是没有实际的服务器

import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor



jobs = 
context = 
executor = ProcessPoolExecutor()


def do_work_per_file(file, file_number):
    # CPU related work here, this method is not async
    # do the work on the file here
    print(f'Starting work on file file_number')
    time.sleep(random.randint(1,10) / 10)
    return file_number


async def do_work(job_key, files=None):
    iter_over = files if files else range(15)
    jobs = context['jobs']
    job_info = jobs[job_key]
    job_info['completed'] = 0

    loop = asyncio.get_running_loop()
    tasks = [loop.run_in_executor(executor,do_work_per_file, file, file_number) for file,file_number in enumerate(iter_over)]
    job_info['status'] = 'inprogress'
    for completed_job in asyncio.as_completed(tasks):
        print(f'Finished work on file await completed_job')
        job_info['completed'] += 1
        print('Current job status is ', job_info)
        

    jobs[job_key]['status'] = 'done'
    print('Current job status is ', job_info)

if __name__ == '__main__':
    context['jobs'] = jobs
    jobs['abc'] = 
    asyncio.run(do_work('abc'))

输出是

Starting work on file 0
Starting work on file 1
Starting work on file 2
Starting work on file 3
Starting work on file 4
Starting work on file 5
Starting work on file 6
Starting work on file 7
Starting work on file 8
Starting work on file 9
Starting work on file 10
Starting work on file 11
Starting work on file 12
Starting work on file 13
Starting work on file 14
Finished work on file 1
Current job status is  'completed': 1, 'status': 'inprogress'
Finished work on file 7
Current job status is  'completed': 2, 'status': 'inprogress'
Finished work on file 9
Current job status is  'completed': 3, 'status': 'inprogress'
Finished work on file 12
Current job status is  'completed': 4, 'status': 'inprogress'
Finished work on file 11
Current job status is  'completed': 5, 'status': 'inprogress'
Finished work on file 13
Current job status is  'completed': 6, 'status': 'inprogress'
Finished work on file 4
Current job status is  'completed': 7, 'status': 'inprogress'
Finished work on file 14
Current job status is  'completed': 8, 'status': 'inprogress'
Finished work on file 0
Current job status is  'completed': 9, 'status': 'inprogress'
Finished work on file 6
Current job status is  'completed': 10, 'status': 'inprogress'
Finished work on file 2
Current job status is  'completed': 11, 'status': 'inprogress'
Finished work on file 3
Current job status is  'completed': 12, 'status': 'inprogress'
Finished work on file 8
Current job status is  'completed': 13, 'status': 'inprogress'
Finished work on file 5
Current job status is  'completed': 14, 'status': 'inprogress'
Finished work on file 10
Current job status is  'completed': 15, 'status': 'inprogress'
Current job status is  'completed': 15, 'status': 'done'

基本上改变的是现在您正在打开一个处理文件工作的新进程池,作为一个新进程也意味着 CPU 密集型工作不会阻塞您的事件循环并阻止您查询作业的状态。

【讨论】:

对我来说最重要的是一种持续跟踪迭代的方法,我想要一个可以定期调用的方法或 API 端点,它会返回我正在处理的文件。您提到job_info['iteration'] 几乎没有意义,但我需要知道正在进行哪个迭代的方法。 @user_12 但如果您同时运行它们,这是否意味着所有迭代都在同时进行?除非您想显示完成了多少次迭代,在这种情况下,您只需将 +=1 移动到工作的结尾而不是开始 我的意思是,循环并行运行 4 次,所以现在我将处理 4 个文件而不是 1 个文件,所以当我调用状态 API 时,它应该返回 4 个处理的文件。 我需要一些方法来跟踪进度。 但是就像我说的,重要的是要知道你在文件上做了什么样的工作,因为这不是处理 CPU 密集型工作的正确方法【参考方案2】:

编辑

似乎joblibParallel 函数正在阻塞响应请求的线程。 一个可能更好的解决方案是@Ron Serruya,他们设法不阻塞主线程。

旧答案

这是一个潜在的解决方案。请注意,我没有测试,但它应该足以给你一个粗略的想法。另外,我不能 100% 确定您需要什么,因此在运行之前肯定需要您的审核。

尽管如此,我不明白您为什么不使用数据库来保持迭代的状态。这样,您可以避免遇到并发(或 Rails)问题,并且即使在电源故障的情况下也可以保持迭代/训练的状态。

from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid
from joblib import Parallel, delayed


context = 'jobs': 

app = FastAPI()

def parallelize(iterate_count):
    # Function part that needs to be run in parallel
    iterate_count += 1
    

async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    jobs = context['jobs'][job_key]
    jobs["iteration"] = 0
    jobs['status'] = 'inprogress'
    Parallel()(delayed(parallelize)(jobs["iteration"]) for file, file_number in enumerate(iter_over))
    jobs['status'] = 'done'


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = 
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return "identifier": identifier


@app.get('/status/identifier')
async def status(identifier):
    return 
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    

【讨论】:

我想你还没有理解我的问题,你能再看看我更新了一点问题。我在哪里处理每个文件? 我发布的代码是在函数do_work 中并行化迭代。由于你必须调用一个函数来并行运行,我不得不编写一个新函数,即parallelize,它执行并行计算。问题是,当您将工作转移到其他线程时,我试图考虑到一个 rails 条件,因此必须更改一些上下文的结构,以尽可能避免这个问题 我认为您仍然没有得到我想要的东西,如果您使用Parallel(),那么iteration 计数的意义就不那么大了吧?它不会反映正在处理的真实文件。但我需要一种方法来跟踪正在处理的迭代/文件并能够并行化它。 我不明白。在这段代码中,我展示了如何跟踪文件的读取和处理(尽管,我写了一条注释来指示代码的放置位置)。这正是您所要求的。既然Parallel()没用,那你还问什么?我不明白。先试试代码(之前调整过,因为我没有测试过)然后回来告诉你必须做的调整以及它是否解决了你的问题 它不起作用,我尝试了上面的代码,它一直说迭代为 0。它没有改变。你能验证一次吗?【参考方案3】:

如果您收到许多请求并且处理时间很长,则跨多个线程并行工作可能会使 API 客户端饿死。因此,请确保将每次调用的线程数(或进程/执行程序 - 见下文)限制为一个小数字。

您可以使用pyspark 将文件路径分配给将完成工作的执行程序(每个都作为一个进程运行),您可以在每台机器上拥有多个执行程序,并且可以跨多台机器进行分发。

另一种选择是通过concurrent.futures 使用线程池,使用max_workers 来限制每个请求的线程数。

并在启动时将并发集合传递给线程,因此他们可以通过写入此集合来“报告”他们的进度(您可以用锁包装常规集合,因为 Python 不提供基于自旋锁的并发集合)。

【讨论】:

您能否根据我在我的问题中的示例提供一个可行的示例,以便我可以用作基础?我对工作流程不是很熟悉?

以上是关于如何在异步函数中并行化 for 循环并跟踪 for 循环执行状态?的主要内容,如果未能解决你的问题,请参考以下文章

在 OpenMP 中并行化嵌套循环并使用更多线程执行内部循环

使用 CUDA 在 python 中展开一个可并行化的 for 循环

使用 Numba 时如何并行化此 Python for 循环

如何使用CUDA并行化嵌套for循环以在2D数组上执行计算

CUDA:并行化具有嵌套循环的函数调用的多个嵌套for循环

如何将python for循环从顺序转换为并行运行