threadpool和Queue

Posted 仰望的花鼻子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了threadpool和Queue相关的知识,希望对你有一定的参考价值。

线程池可以看作容纳线程的容器,一个应用程序最多只能有一个线程池。

ThreadPool中的线程不用手动开始,也不能手动取消,你要做的只是把工作函数排入线程池,剩下的工作将由系统自动完成,也就是说我们不能控制线程池中的线程。如果想对线程进行更多的控制,那就不适合使用线程池。在以下情况中不宜使用ThreadPool类而应该使用单独的Thread类:

  1.线程执行需要很长时间;

  2.需要为线程指定详细的优先级;

  3.在执行过程中需要对线程进行操作,比如睡眠,挂起等

所以ThreadPool适合于并发运行若干个运行时间不长且互不干扰的函数。

ThreadPool的作用:

   现在我们逐步向线程池中排入100个任务,看看线程池中的线程数目是如何变化的。从而加深对线程池的理解。为了叙述方便,我们假设下限位5,上限位25.

  1.当线程池被创建后,里面就会创建5个空线程(和下限值相同)

  2.当我们向线程池中排入一个任务后,就会有一个空线程接手该任务,然后运行起来。随着我们不断向线程池中排入任务,线程池中的空线程逐一接手任务并被执行。

  3.随着任务的增加,在某一时刻任务数量会超出下限。当任务请求数量超出下限时,线程池并不会立即创建新线程,而是等待大约500ms左右,这么做的目的时看看在这段时间内是否有其他线程完成任务来接手这个请求,这样就可以避免因创建新线程而造成的消耗。如果这段时间内没有线程完成任务,就创建一个新线程去执行新任务。

  4.当任务数量超过下限后,每排入一个新任务,就会增加一个新线程,这段期间,任务和线程数量都持续增加,直至线程数量达到上线值为止。

  5.当线程数量达到上限时,继续增加任务,线程数量将不再增加,比如你向线程池中排入100个任务,则只有25个进入线程池(和上限相同),另外75个在线程池外排队等待。当线程池中的某个线程完成任务后,并不会被终止,而是从等待队列中选择一个任务继续执行,这样就减少了因创建和销毁线程而消耗的时间。

  6.当排入所有的任务后,随着线程池内的任务被逐步完成,线程池外部等待的任务被逐步调入线程池,任务的数量逐步减少,但线程的数量保持恒定,始终和上限值相同。

  7.随着任务被逐步完成,总有某一刻,任务数量会小于上限值,这时线程池内多余的线程会在空闲2min后被释放并回收相关资源。线程数目逐步减小,直到达到下限值为止。

  8.当任务数量减小到下限值之下时,线程池中的线程数目保持不变(始终和下限值相同),其中一部分在执行任务,另一部分处于空运行状态。

  9.当所有任务都完成后,线程池恢复初始状态,运行5个空线程。

由上面的论述可以看出线程池提高效率的关键是一个线程完成任务后可以继续为其他任务服务,这样就可以使用有限的几个固定线程轮流为大量的任务服务,从而减少了因频繁创建和销毁线程锁造成的消耗。

 

安装库:pip install threadpool

pool=ThreadPool(poolsize)   #定义了一个线程池,最多可以创建poolsize这么多的线程

requests=makeRequests(some_callable,list_of_args,callback)  #要开启多线程的函数,参数,回调函数(可不写)

[pool.putRequests(req) for req in requests]   #将所有要运行多线程的请求扔进线程池

pool.wait() #等待所有的线程完成工作后退出

makeRequests(callable,args_list,callback=None,exc_callback=None)
    创建多个计算请求,并允许有不同的参数
  参数列表中的每一个元素是两个元素的元祖,分别是位置参数列表和关键字参数字典
class ThreadPool
  线程池类,发布工作请求并收集结果
  __init__(self,num_workers,q_size)
      构造函数,设置线程池工作线程数量和最大任务队列长度,num_workers是初始化时的线程数量。如果q_size>0则会限制工作队列的长度,并且在工作队列满时阻塞继续插入工作请求的任务。
  createWorkers(self,num_workers)
    增加工作线程数量
  dismissWorkers(self,num_workers)
    减少工作线程数量
  pool(self,block)
    处理队列中的新结果。也就是循环的调用各个线程结果中的回调和错误回调。不过,当请求队列为空时会抛出NoResultPending异常,以表示所有的结果都处理完了,这个特点对于依赖线程执行结果继续加入请求队列的方式不太适合。
  putRequest(self,request,block=True,timeout=0)
    加入一个任务请求到工作队列
  wait(self)
    等待执行结果,知道所有任务完成。
class WorkerThread
  工作者线程,供ThreadPool内部使用,不必关注,其自定义方法也只有一个
class WorkRequest
  任务请求类
  __init__(self,callable,args=None,kwds=None,requestID=None,callback=None,exc_callback=None)
     创建一个工作请求

 

-----------------------------------------------------------

例子:

import time
def sayhello(str):
print "Hello ",str
time.sleep(2)

name_list =[\'xiaozi\',\'aa\',\'bb\',\'cc\']
start_time = time.time()
for i in range(len(name_list)):
sayhello(name_list[i])
print \'%d second\'% (time.time()-start_time)

输出时间为8 second

-----------------------------------------------------------------

import time
import threadpool
def sayhello(str):
print "Hello ",str
time.sleep(2)

name_list =[\'xiaozi\',\'aa\',\'bb\',\'cc\']
start_time = time.time()
pool = threadpool.ThreadPool(10)
requests = threadpool.makeRequests(sayhello, name_list)
[pool.putRequest(req) for req in requests]
pool.wait()
print \'%d second\'% (time.time()-start_time)

输出时间为2 second

以上是关于threadpool和Queue的主要内容,如果未能解决你的问题,请参考以下文章

使用Task代替ThreadPool和Thread

ThreadPool.QueueUserWorkItem 和 Parallel.ForEach 的区别?

threadpool和Queue

ThreadPool.QueueUserWorkItem的用法,带参数和不带参数

多线程异步编程示例和实践-Thread和ThreadPool

Percona 8.0 ThreadPool源码解析