Python 将任务排入队列并按顺序获取结果
Posted
技术标签:
【中文标题】Python 将任务排入队列并按顺序获取结果【英文标题】:Python enqueue tasks and get results in order 【发布时间】:2021-08-10 02:55:48 【问题描述】:我想让多个线程执行任务,但我也想按顺序获得结果。
拿一个简单的示例代码:
from threading import Thread
import queue
import time
class TaskQueue(queue.Queue):
def __init__(self, num_workers=1):
queue.Queue.__init__(self)
self.num_workers = num_workers
self.start_workers()
def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or
self.put((task, args, kwargs))
def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def worker(self):
while True:
##tupl = self.get() # REMOVED
item, args, kwargs = self.get()
item(*args, **kwargs)
self.task_done()
def task(task_num, sleep_time):
time.sleep(sleep_time)
print("Task # sleeping ".format(task_num, sleep_time))
q = TaskQueue(num_workers=2)
for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
q.add_task(task, t, s)
q.join() # block until all tasks are done
print("All Done!!")
我在哪里添加任务,具有关联的任务编号,每个任务都需要不同的执行时间(睡眠)。
我有三个问题/疑问。
1) 我什至没有得到所有的输出(甚至没有考虑顺序)。目前我只是得到输出:
Task #4 sleeping 6
Task #2 sleeping 8
Task #6 sleeping 4
Task #8 sleeping 2
似乎我没有得到奇怪的任务,也许一切都来自其他工人。为什么会这样?我怎样才能得到它们?
该程序随后挂起。我假设因为工人阻塞,直到它从队列中得到一些东西。如果队列为空,则永远等待。我怎样才能更新它,让它退出或点击“全部完成!!”一旦队列中没有更多任务。
如何让它按顺序打印任务?基本上我希望结果是:
Task #1 sleeping 9
Task #2 sleeping 8
Task #3 sleeping 7
Task #4 sleeping 6
Task #5 sleeping 5
Task #6 sleeping 4
Task #7 sleeping 3
Task #8 sleeping 2
Task #9 sleeping 1
还假设任务结果非常大并且任务本身的数量很多,因此我真的不想将它们全部保存在内存中然后进行一些排序。我应该知道添加到队列中的任务数量,并且只想将它们用于首先打印的内容。暂时在内存中保存一些是可以接受的。我知道在当前示例中,您必须先保存一些,因为第一个任务需要的时间最长。您可以假设每个任务的执行时间(或本例中的睡眠时间)是随机的。
目前使用 Python 3.7
---编辑---
从上面的代码中删除tupl = self.get()
解决了问题#1 和#2。因此只剩下第 3 个问题。欢迎任何想法/解决方案
【问题讨论】:
由于打印语句发生在异步线程中,因此无法调整它们的打印顺序。但是,您可以从线程中获取结果,然后按任务分配的顺序打印它们。这就是你想要帮助的吗? 您的第三个问题目前听起来您希望异步代码同步运行,这与这一点相悖。只需同步运行代码即可。你能澄清一下吗? @deseuler 是的,听起来我确实希望它同步运行,但我不希望。同步运行它需要更长的时间。让我们举个例子,我们有 10 个任务,每个任务运行 10 秒。如果我要同步运行它们,则需要 100 秒才能完成。如果我有 2 个线程正在运行,那么我可以在一半的时间内(约 50 秒)得到结果。这就是我想要的。也许我需要 2 个队列,一个用于任务,一个用于结果。结果队列可能只是将对象作为结果,然后它的主线程管理首先打印哪些对象或类似的东西。 是的,这样做。请注意,您必须准备好存储所有结果(内存、文件系统等)以对它们进行排序。最坏的情况:第一个任务是最后完成的。或者,使用ThreadPoolExecutor.map()
from concurrent.futures
,这相当于同一件事。
我相信他正试图对线程内非耗时操作的执行进行一些控制。经过考虑,通过传递自定义链表似乎是可能的。 concurrent.futures.Executor
似乎也很有希望。
【参考方案1】:
这是我的问题的答案。使用两个队列(用于作业和结果)。从结果队列中提取答案并保存在字典中。它们按顺序打印出来并相应地删除。
2 个工作人员需要 23 秒,其中 1 个工作人员或仅同步执行需要 45 秒:
from threading import Thread
import queue
import time
import datetime
class TaskQueue():
def __init__(self, num_workers=1):
self.num_workers = num_workers
self.total_num_jobs = 0
self.jobs_completed = 0
self.answers_sent = 0
self.jobs = queue.Queue()
self.results = queue.Queue()
self.start_workers()
def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or
self.total_num_jobs += 1
self.jobs.put((task, args, kwargs))
def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def worker(self):
while True:
item, args, kwargs = self.jobs.get()
item(*args, **kwargs)
self.jobs_completed += 1
self.jobs.task_done()
def get_answers(self):
while self.answers_sent < self.total_num_jobs or self.jobs_completed == 0:
yield self.results.get()
self.answers_sent += 1
self.results.task_done()
def task(task_num, sleep_time, q):
time.sleep(sleep_time)
ans = "Task # sleeping ".format(task_num, sleep_time)
q.put((task_num, ans))
if __name__ == "__main__":
start = datetime.datetime.now()
h = TaskQueue(num_workers=2)
q = h.results
answers =
curr_task = 1
for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
h.add_task(task, t, s, q)
for task_num, ans in h.get_answers():
answers[task_num] = ans
if curr_task in answers:
print(answers[curr_task])
del answers[curr_task]
curr_task += 1
# Print remaining items (if any)
for k, v in sorted(answers.items()):
print(v)
h.jobs.join() # block until all tasks are done
print("All done")
print("Total Execution: ".format(datetime.datetime.now() - start))
【讨论】:
以上是关于Python 将任务排入队列并按顺序获取结果的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Redis Queue 将带有参数的函数排入队列?