如何在 python 3 中将队列与并发未来的 ThreadPoolExecutor 一起使用?

Posted

技术标签:

【中文标题】如何在 python 3 中将队列与并发未来的 ThreadPoolExecutor 一起使用?【英文标题】:How to use queue with concurrent future ThreadPoolExecutor in python 3? 【发布时间】:2013-05-30 15:53:19 【问题描述】:

我正在使用简单的线程模块来执行并发作业。现在我想利用并发期货模块。有人能给我举一个使用队列和并发库的例子吗?

我收到 TypeError: 'Queue' object is not iterable 我不知道如何迭代队列

代码sn-p:

 def run(item):
      self.__log.info(str(item))
      return True
<queue filled here>

with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
        furtureIteams =  executor.submit(run, item): item for item in list(queue)
        for future in concurrent.futures.as_completed(furtureIteams):
            f = furtureIteams[future]
            print(f)

【问题讨论】:

通常你会使用队列来解决消费者生产者问题en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem 我正在寻找一些示例代码来使用 threadpoolexecutor 读取队列 【参考方案1】:

我会建议这样的事情:

def run(queue):
      item = queue.get()
      self.__log.info(str(item))
      return True
<queue filled here>
workerThreadsToStart = 10
with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
        furtureIteams =  executor.submit(run, queue): index for intex in range(workerThreadsToStart)
        for future in concurrent.futures.as_completed(furtureIteams):
            f = furtureIteams[future]
            print(f)

您将遇到的问题是,队列被认为是无止境的,并且作为一种媒介来解耦将某些内容放入队列的线程和将项目从队列中取出的线程。

    您的项目数量有限或 您一次计算所有项目

然后并行处理它们,队列没有意义。 在这些情况下,ThreadPoolExecutor 会使队列过时。

我查看了 ThreadPoolExecutor 源代码:

def submit(self, fn, *args, **kwargs): # line 94
    self._work_queue.put(w) # line 102

里面使用了一个队列。

【讨论】:

+1 队列在这里可能是多余的。通常,您可以使用两个参数 iter() 函数将队列转换为可迭代对象:for item in iter(queue.get, sentinel): # get items until sentinel found【参考方案2】:

如上所述,您可以使用iter() 函数在队列对象上执行线程池。一个非常通用的代码如下所示:

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(run, iter(queue.get, None))

run 方法在队列中的项目上执行期望的工作。

【讨论】:

以上是关于如何在 python 3 中将队列与并发未来的 ThreadPoolExecutor 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

进程队列补充socket实现服务器并发线程完结

Python 3 并发编程多进程之队列(推荐使用)

消息队列,高并发的救火员

asyncio--python3未来并发编程主流充满野心的模块

Python并发编程03/僵尸孤儿进程,互斥锁,进程之间的通信

Python并发编程-生产消费模型