Python 将任务排入队列并按顺序获取结果(多进程)
Posted
技术标签:
【中文标题】Python 将任务排入队列并按顺序获取结果(多进程)【英文标题】:Python enqueue tasks and get results in order (multiprocess) 【发布时间】:2021-08-10 21:34:54 【问题描述】:我在enqueue tasks and get results in order 之前提出了这个问题,并使用多线程开发了一个答案。由于一些性能问题(在我的真实场景中——在这个简单的例子中没有看到),我想尝试使用多处理。不幸的是,我无法提出一个可行的解决方案。为了快速参考,这里是多线程的工作解决方案:
from threading import Thread
import queue
import time
import datetime
class TaskQueue():
def __init__(self, num_workers=1):
self.num_workers = num_workers
self.total_num_jobs = 0
self.jobs_completed = 0
self.answers_sent = 0
self.jobs = queue.Queue()
self.results = queue.Queue()
self.start_workers()
def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or
self.total_num_jobs += 1
self.jobs.put((task, args, kwargs))
def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def worker(self):
while True:
item, args, kwargs = self.jobs.get()
item(*args, **kwargs)
self.jobs_completed += 1
self.jobs.task_done()
def get_answers(self):
while self.answers_sent < self.total_num_jobs or self.jobs_completed == 0:
yield self.results.get()
self.answers_sent += 1
self.results.task_done()
def task(task_num, sleep_time, q):
time.sleep(sleep_time)
ans = "Task # sleeping ".format(task_num, sleep_time)
q.put((task_num, ans))
if __name__ == "__main__":
start = datetime.datetime.now()
h = TaskQueue(num_workers=2)
q = h.results
answers =
curr_task = 1
for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
h.add_task(task, t, s, q)
for task_num, ans in h.get_answers():
answers[task_num] = ans
if curr_task in answers:
print(answers[curr_task])
del answers[curr_task]
curr_task += 1
# Print remaining items (if any)
for k, v in sorted(answers.items()):
print(v)
h.jobs.join() # block until all tasks are done
print("All done")
print("Total Execution: ".format(datetime.datetime.now() - start))
期望的输出:
Task #1 sleeping 9
Task #2 sleeping 8
Task #3 sleeping 7
Task #4 sleeping 6
Task #5 sleeping 5
Task #6 sleeping 4
Task #7 sleeping 3
Task #8 sleeping 2
Task #9 sleeping 1
All done
仅在多线程时需要 23 秒,但如果作业是同步执行则需要 45 秒。
【问题讨论】:
【参考方案1】:您可以使用 asycnio 使您的程序更快。 然而,Asyncio 很难理解,但经过一些经验,它可能会非常有益。
Area Covered by Asyncio
这是一个异步代码示例。
import time
import asyncio
async def main_process(sleepTime):
await asyncio.sleep(sleepTime)
print("Done sleep for",sleepTime)
return sleepTime
async def main():
startTime = time.perf_counter()
tasks = []
for sleepTime in [9,8,7,6,5,4,3,2,1]:
task = asyncio.ensure_future(main_process(sleepTime))
tasks.append(task)
responses = await asyncio.gather(*tasks)
endTime = time.perf_counter()
print('Total Time taken',endTime-startTime)
print("these are the responses gathered",responses)
loop=asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)
在上面的 asyncio.gather 也以相同的顺序收集了所有响应。
这将给出以下输出:
Done sleep for 1
Done sleep for 2
Done sleep for 3
Done sleep for 4
Done sleep for 5
Done sleep for 6
Done sleep for 7
Done sleep for 8
Done sleep for 9
Total Time taken 9.015136200000143
these are the responses gathered [9, 8, 7, 6, 5, 4, 3, 2, 1]
它可能会给你一些想法
【讨论】:
这并没有真正给我想要的解决方案,但绝对值得一提。将来可能有用。谢谢 实际上我使用上述方法来解决您要解决的问题。在我的场景中,我必须先收集请求,然后再执行处理任务。为此,我等待主要请求任务,其他任务也相应执行。 在 asyncio 中,您可以收集多个任务,正如我提到的 asyncio.gather 这样我们可以同时制作 2 个或更多 asyncio.gather,它还将同时执行其他 asyncio.gather 任务时间。如果您需要更多解释,我可以更新我的代码,以便您了解如何使用 asyncio 实现您想要的目标 不,它没有做我想要解决的问题。如果您使用“for sleepTime in [9,8,7,6,5,4,3,2,1]:”更新您的代码,您将首先获得最低睡眠。我的示例应该首先获得最长的示例。基本上它应该按照添加到队列中的作业的顺序打印出来。查看所需的输出 哦,我想我知道你想说什么。您是说这只是一个示例,但并不是我的问题的真正解决方案。我需要对您的答案进行进一步更新,以使其适用于我的具体问题。以上是关于Python 将任务排入队列并按顺序获取结果(多进程)的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Redis Queue 将带有参数的函数排入队列?
POST请求并按顺序获取响应(NodeJS,python客户端)