Python 将任务排入队列并按顺序获取结果(多进程)

Posted

技术标签:

【中文标题】Python 将任务排入队列并按顺序获取结果(多进程)【英文标题】:Python enqueue tasks and get results in order (multiprocess) 【发布时间】:2021-08-10 21:34:54 【问题描述】:

我在enqueue tasks and get results in order 之前提出了这个问题,并使用多线程开发了一个答案。由于一些性能问题(在我的真实场景中——在这个简单的例子中没有看到),我想尝试使用多处理。不幸的是,我无法提出一个可行的解决方案。为了快速参考,这里是多线程的工作解决方案:

from threading import Thread
import queue
import time
import datetime

class TaskQueue():
    def __init__(self, num_workers=1):
        self.num_workers = num_workers
        self.total_num_jobs = 0
        self.jobs_completed = 0
        self.answers_sent = 0
        self.jobs = queue.Queue()
        self.results = queue.Queue()
        self.start_workers()

    def add_task(self, task, *args, **kwargs):
        args = args or ()
        kwargs = kwargs or 
        self.total_num_jobs += 1
        self.jobs.put((task, args, kwargs))

    def start_workers(self):
        for i in range(self.num_workers):
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    def worker(self):
        while True:
            item, args, kwargs = self.jobs.get()
            item(*args, **kwargs)
            self.jobs_completed += 1
            self.jobs.task_done()

    def get_answers(self):
        while self.answers_sent < self.total_num_jobs or self.jobs_completed == 0:
            yield self.results.get()
            self.answers_sent += 1
            self.results.task_done()


def task(task_num, sleep_time, q):
    time.sleep(sleep_time)
    ans = "Task # sleeping ".format(task_num, sleep_time)
    q.put((task_num, ans))


if __name__ == "__main__":
    start = datetime.datetime.now()
    h = TaskQueue(num_workers=2)
    q = h.results
    answers = 
    curr_task = 1

    for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
        h.add_task(task, t, s, q)

    for task_num, ans in h.get_answers():
        answers[task_num] = ans
        if curr_task in answers:
            print(answers[curr_task])
            del answers[curr_task]
            curr_task += 1

    # Print remaining items (if any)
    for k, v in sorted(answers.items()):
        print(v)

    h.jobs.join()  # block until all tasks are done

    print("All done")
    print("Total Execution: ".format(datetime.datetime.now() - start))

期望的输出:

Task #1 sleeping 9
Task #2 sleeping 8
Task #3 sleeping 7
Task #4 sleeping 6
Task #5 sleeping 5
Task #6 sleeping 4
Task #7 sleeping 3
Task #8 sleeping 2
Task #9 sleeping 1
All done

仅在多线程时需要 23 秒,但如果作业是同步执行则需要 45 秒。

【问题讨论】:

【参考方案1】:

您可以使用 asycnio 使您的程序更快。 然而,Asyncio 很难理解,但经过一些经验,它可能会非常有益。

Area Covered by Asyncio

这是一个异步代码示例。

import time
import asyncio

async def main_process(sleepTime):
    await asyncio.sleep(sleepTime)
    print("Done sleep for",sleepTime)
    return sleepTime
    
async def main():
    startTime = time.perf_counter()
    tasks = []
    for sleepTime in [9,8,7,6,5,4,3,2,1]:
        task = asyncio.ensure_future(main_process(sleepTime))
        tasks.append(task)
    responses = await asyncio.gather(*tasks)
    endTime = time.perf_counter()
    print('Total Time taken',endTime-startTime)
    print("these are the responses gathered",responses)
    
loop=asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)

在上面的 asyncio.gather 也以相同的顺序收集了所有响应。

这将给出以下输出:

Done sleep for 1
Done sleep for 2
Done sleep for 3
Done sleep for 4
Done sleep for 5
Done sleep for 6
Done sleep for 7
Done sleep for 8
Done sleep for 9
Total Time taken 9.015136200000143
these are the responses gathered [9, 8, 7, 6, 5, 4, 3, 2, 1]

它可能会给你一些想法

【讨论】:

这并没有真正给我想要的解决方案,但绝对值得一提。将来可能有用。谢谢 实际上我使用上述方法来解决您要解决的问题。在我的场景中,我必须先收集请求,然后再执行处理任务。为此,我等待主要请求任务,其他任务也相应执行。 在 asyncio 中,您可以收集多个任务,正如我提到的 asyncio.gather 这样我们可以同时制作 2 个或更多 asyncio.gather,它还将同时执行其他 asyncio.gather 任务时间。如果您需要更多解释,我可以更新我的代码,以便您了解如何使用 asyncio 实现您想要的目标 不,它没有做我想要解决的问题。如果您使用“for sleepTime in [9,8,7,6,5,4,3,2,1]:”更新您的代码,您将首先获得最低睡眠。我的示例应该首先获得最长的示例。基本上它应该按照添加到队列中的作业的顺序打印出来。查看所需的输出 哦,我想我知道你想说什么。您是说这只是一个示例,但并不是我的问题的真正解决方案。我需要对您的答案进行进一步更新,以使其适用于我的具体问题。

以上是关于Python 将任务排入队列并按顺序获取结果(多进程)的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Redis Queue 将带有参数的函数排入队列?

线程顺序同步

POST请求并按顺序获取响应(NodeJS,python客户端)

保留多线程API中的传入请求顺序并按顺序处理

Python - 欺骗库中的单个方法 - 处理死队列 - Djangoq/qcluster/SQS

php 通过版本更新将样式排入队列