Python 将任务排入队列并按顺序获取结果

Posted

技术标签:

【中文标题】Python 将任务排入队列并按顺序获取结果【英文标题】:Python enqueue tasks and get results in order 【发布时间】:2021-08-10 02:55:48 【问题描述】:

我想让多个线程执行任务,但我也想按顺序获得结果。

拿一个简单的示例代码:

from threading import Thread
import queue
import time


class TaskQueue(queue.Queue):
    def __init__(self, num_workers=1):
        queue.Queue.__init__(self)
        self.num_workers = num_workers
        self.start_workers()

    def add_task(self, task, *args, **kwargs):
        args = args or ()
        kwargs = kwargs or 
        self.put((task, args, kwargs))

    def start_workers(self):
        for i in range(self.num_workers):
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    def worker(self):
        while True:
            ##tupl = self.get()  # REMOVED
            item, args, kwargs = self.get()
            item(*args, **kwargs)
            self.task_done()


def task(task_num, sleep_time):
    time.sleep(sleep_time)
    print("Task # sleeping ".format(task_num, sleep_time))

q = TaskQueue(num_workers=2)

for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
    q.add_task(task, t, s)

q.join()  # block until all tasks are done
print("All Done!!")

我在哪里添加任务,具有关联的任务编号,每个任务都需要不同的执行时间(睡眠)。

我有三个问题/疑问。

1) 我什至没有得到所有的输出(甚至没有考虑顺序)。目前我只是得到输出:

   Task #4 sleeping 6
   Task #2 sleeping 8
   Task #6 sleeping 4
   Task #8 sleeping 2

似乎我没有得到奇怪的任务,也许一切都来自其他工人。为什么会这样?我怎样才能得到它们?

    该程序随后挂起。我假设因为工人阻塞,直到它从队列中得到一些东西。如果队列为空,则永远等待。我怎样才能更新它,让它退出或点击“全部完成!!”一旦队列中没有更多任务。

    如何让它按顺序打印任务?基本上我希望结果是:

    Task #1 sleeping 9
    Task #2 sleeping 8
    Task #3 sleeping 7
    Task #4 sleeping 6
    Task #5 sleeping 5
    Task #6 sleeping 4
    Task #7 sleeping 3
    Task #8 sleeping 2
    Task #9 sleeping 1
    

还假设任务结果非常大并且任务本身的数量很多,因此我真的不想将它们全部保存在内存中然后进行一些排序。我应该知道添加到队列中的任务数量,并且只想将它们用于首先打印的内容。暂时在内存中保存一些是可以接受的。我知道在当前示例中,您必须先保存一些,因为第一个任务需要的时间最长。您可以假设每个任务的执行时间(或本例中的睡眠时间)是随机的。

目前使用 Python 3.7

---编辑---

从上面的代码中删除tupl = self.get() 解决了问题#1 和#2。因此只剩下第 3 个问题。欢迎任何想法/解决方案

【问题讨论】:

由于打印语句发生在异步线程中,因此无法调整它们的打印顺序。但是,您可以从线程中获取结果,然后按任务分配的顺序打印它们。这就是你想要帮助的吗? 您的第三个问题目前听起来您希望异步代码同步运行,这与这一点相悖。只需同步运行代码即可。你能澄清一下吗? @deseuler 是的,听起来我确实希望它同步运行,但我不希望。同步运行它需要更长的时间。让我们举个例子,我们有 10 个任务,每个任务运行 10 秒。如果我要同步运行它们,则需要 100 秒才能完成。如果我有 2 个线程正在运行,那么我可以在一半的时间内(约 50 秒)得到结果。这就是我想要的。也许我需要 2 个队列,一个用于任务,一个用于结果。结果队列可能只是将对象作为结果,然后它的主线程管理首先打印哪些对象或类似的东西。 是的,这样做。请注意,您必须准备好存储所有结果(内存、文件系统等)以对它们进行排序。最坏的情况:第一个任务是最后完成的。或者,使用ThreadPoolExecutor.map() from concurrent.futures,这相当于同一件事。 我相信他正试图对线程内非耗时操作的执行进行一些控制。经过考虑,通过传递自定义链表似乎是可能的。 concurrent.futures.Executor 似乎也很有希望。 【参考方案1】:

这是我的问题的答案。使用两个队列(用于作业和结果)。从结果队列中提取答案并保存在字典中。它们按顺序打印出来并相应地删除。

2 个工作人员需要 23 秒,其中 1 个工作人员或仅同步执行需要 45 秒:

from threading import Thread
import queue
import time
import datetime

class TaskQueue():
    def __init__(self, num_workers=1):
        self.num_workers = num_workers
        self.total_num_jobs = 0
        self.jobs_completed = 0
        self.answers_sent = 0
        self.jobs = queue.Queue()
        self.results = queue.Queue()
        self.start_workers()

    def add_task(self, task, *args, **kwargs):
        args = args or ()
        kwargs = kwargs or 
        self.total_num_jobs += 1
        self.jobs.put((task, args, kwargs))

    def start_workers(self):
        for i in range(self.num_workers):
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    def worker(self):
        while True:
            item, args, kwargs = self.jobs.get()
            item(*args, **kwargs)
            self.jobs_completed += 1
            self.jobs.task_done()

    def get_answers(self):
        while self.answers_sent < self.total_num_jobs or self.jobs_completed == 0:
            yield self.results.get()
            self.answers_sent += 1
            self.results.task_done()


def task(task_num, sleep_time, q):
    time.sleep(sleep_time)
    ans = "Task # sleeping ".format(task_num, sleep_time)
    q.put((task_num, ans))


if __name__ == "__main__":
    start = datetime.datetime.now()
    h = TaskQueue(num_workers=2)
    q = h.results
    answers = 
    curr_task = 1

    for t, s in zip([1,2,3,4,5,6,7,8,9], [9,8,7,6,5,4,3,2,1]):
        h.add_task(task, t, s, q)

    for task_num, ans in h.get_answers():
        answers[task_num] = ans
        if curr_task in answers:
            print(answers[curr_task])
            del answers[curr_task]
            curr_task += 1

    # Print remaining items (if any)
    for k, v in sorted(answers.items()):
        print(v)

    h.jobs.join()  # block until all tasks are done

    print("All done")
    print("Total Execution: ".format(datetime.datetime.now() - start))

【讨论】:

以上是关于Python 将任务排入队列并按顺序获取结果的主要内容,如果未能解决你的问题,请参考以下文章

线程顺序同步

如何使用 Redis Queue 将带有参数的函数排入队列?

Python - 欺骗库中的单个方法 - 处理死队列 - Djangoq/qcluster/SQS

celery定时任务

芹菜中持久的长时间运行的任务

POST请求并按顺序获取响应(NodeJS,python客户端)