每行文件的异步 HTTP API 调用 - Python

Posted

技术标签:

【中文标题】每行文件的异步 HTTP API 调用 - Python【英文标题】:Async HTTP API call for each line of file - Python 【发布时间】:2017-12-24 18:44:22 【问题描述】:

我正在处理一个大数据问题,但遇到了一些并发和异步 io 问题。问题如下:

1) 有多个大文件(每个 x 最多 15 个约 4gb),我正在使用来自 concurrent.futures 模块的ProcessPoolExecutor 这样处理:

def process(source):
    files = os.list(source)
    with ProcessPoolExecutor() as executor:
        future_to_url = executor.submit(process_individual_file, source, input_file):input_file for input_file in files
        for future in as_completed(future_to_url):
            data = future.result()

2)现在在每个文件中,我想逐行处理以创建特定的 json,将这样的 2K json 组合在一起,然后使用该请求访问 API 以获得响应。代码如下:

def process_individual_file(source, input_file):
    limit = 2000
    with open(source+input_file) as sf:
        for line in sf:
            json_array.append(form_json(line))
            limit -= 1

            if limit == 0:
                response = requests.post(API_URL, json=json_array)
                #check response status here
                limit = 2000

3) 现在的问题是,每个文件中的行数非常大,API 调用阻塞和响应有点慢,程序需要大量时间才能完成。

4) 我想要实现的是使该 API 调用 async 以便在该 API 调用发生时我可以继续处理下一批 2000。

5) 到目前为止我尝试过的事情:我试图使用asyncio 来实现它,但是我们需要收集一组未来的任务并使用事件循环等待完成。像这样的:

async def process_individual_file(source, input_file):
    tasks = []
    limit = 2000
    with open(source+input_file) as sf:
        for line in sf:
            json_array.append(form_json(line))
            limit -= 1

            if limit == 0:
               tasks.append(asyncio.ensure_future(call_api(json_array)))
               limit = 2000

    await asyncio.wait(tasks)

ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(process_individual_file(source, input_file))
ioloop.close()

6)我真的不明白这一点,因为这与之前的间接相同,因为它在启动它们之前等待收集所有任务。有人可以帮我解决这个问题的正确架构吗?如何调用 API 异步方式,而不收集所有任务并能够并行处理下一批?

【问题讨论】:

【参考方案1】:

我真的不明白这一点,因为这是间接的 与之前相同,因为它在启动前等待收集所有任务 他们。

不,你错了。当您使用asyncio.ensure_future 创建asyncio.Task 时,它会立即开始执行call_api 协程。这就是 asyncio 中任务的工作方式:

import asyncio


async def test(i):
    print(f'i started')
    await asyncio.sleep(i)


async def main():
    tasks = [
        asyncio.ensure_future(test(i))
        for i
        in range(3)
    ]

    await asyncio.sleep(0)
    print('At this moment tasks are already started')

    await asyncio.wait(tasks)


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

0 started
1 started
2 started
At this moment tasks are already started

您的方法的问题是 process_individual_file 实际上不是异步的:它执行大量与 CPU 相关的工作,而不会将控制权返回给您的 asyncio 事件循环。这是一个问题 - 功能块事件循环使得不可能执行的任务。

我认为您可以使用的非常简单但有效的解决方案 - 是在执行几次 process_individual_file 之后,使用 asyncio.sleep(0) 手动将控制返回到事件循环,例如,在读取每一行时:

async def process_individual_file(source, input_file):
    tasks = []
    limit = 2000
    with open(source+input_file) as sf:
        for line in sf:
            await asyncio.sleep(0)  # Return control to event loop to allow it execute tasks

            json_array.append(form_json(line))
            limit -= 1

            if limit == 0:
               tasks.append(asyncio.ensure_future(call_api(json_array)))
               limit = 2000

    await asyncio.wait(tasks)

更新:

将有超过数百万个请求完成,因此我是 将所有未来对象存储在一个 列表

这很有意义。如果您运行数百万个并行网络请求,则不会发生任何好事。在这种情况下,通常设置限制的方法是使用同步原语,如 asyncio.Semaphore。

我建议您制作生成器以从文件中获取json_array,并在添加新任务之前获取Semaphore,并在任务准备好时释放它。您将获得不受许多并行运行任务的保护的干净代码。

这看起来像这样:

def get_json_array(input_file):
    json_array = []
    limit = 2000

    with open(input_file) as sf:
        for line in sf:
            json_array.append(form_json(line))

            limit -= 1
            if limit == 0:
                yield json_array  # generator will allow split file-reading logic from adding tasks

                json_array = []
                limit = 2000


sem = asyncio.Semaphore(50)  # don't allow more than 50 parallel requests

async def process_individual_file(input_file):
    for json_array in get_json_array(input_file):
        await sem.acquire()  # file reading wouldn't resume until there's some place for newer tasks
        task = asyncio.ensure_future(call_api(json_array))
        task.add_done_callback(lambda t: sem.release())  # on task done - free place for next tasks
        task.add_done_callback(lambda t: print(t.result()))  # print result on some call_api done

【讨论】:

感谢@Mikhail Gerasimov 纠正我。事实上,我没有正确理解它,因此被卡住了,因为预期的结果没有到来。你说的是有道理的,但我再次怀疑的是,由于文件的大小,将有超过数百万个请求需要完成,因此我对将所有这些未来的对象存储在一个列表。 这是我同时阅读的另一篇文章link。似乎我可以在另一个线程中触发事件循环并将我的未来委托给该线程并触发 API 响应的回调。现在在同一个 poc 上工作。让我也试试你的建议并得到结果。非常感谢:) @ShubhamPatil 我更新了答案,展示了如何避免许多并行请求。

以上是关于每行文件的异步 HTTP API 调用 - Python的主要内容,如果未能解决你的问题,请参考以下文章

Node中并行/异步的多个分页GET API调用

Node中并行/异步的多个分页GET API调用

React Jest:如何模拟返回数据的异步 API 调用?

如何在 Meteor 中批量运行异步调用?

如何同步节点中的两个异步调用?

对 api 服务器的多个异步请求(即发即弃)