3.2.5 线程池
Posted infinitecodes
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3.2.5 线程池相关的知识,希望对你有一定的参考价值。
queue队列
日,这篇好长啊….
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
多线程编程环境下,当在多条线程中,信息必须被安全的交换时,queue尤其有用。
class queue.
Queue
(maxsize=0) #先入先出
class queue.
LifoQueue
(maxsize=0) #last in fisrt out 后入先出
class queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be
优先级队列的构造函数,maxsize是一个指针,该指针设定了可以放在队列中的条目的数量上限
placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is
一旦达到这个上限,插入会被阻止,直到队列中的条目被消耗掉。
less than or equal to zero, the queue size is infinite.
如果maxsize(条目上限指针)小于等于0,队列大小会变成无限。
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by
sorted(list(entries))[0]
).最低的进入值会最先被检索(最低进入值是一个由sorted(list(entries)[0]返回的值)
A typical pattern for entries is a tuple in the form:
(priority_number, data)
.关于entries,一个典型的样板是表格中的一个元组:
(priority_number, data)
- exception
queue.
Empty
Exception raised when non-blocking
get()
(orget_nowait()
) is called on aQueue
object which is empty.Queue对象为空时调用非阻塞方法get()或get_nowait()时将触发异常
- exception
queue.
Full
Exception raised when non-blocking
put()
(orput_nowait()
) is called on aQueue
object which is full.满的时候调用
put()
或put_nowait()
也会触发异常Queue.
qsize
()Queue.
empty
() #return True if empty 如果队列为空,返回TrueQueue.
full
() # return True if full 如果队列为满,返回TrueQueue.
put
(item, block=True, timeout=None)Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot
放条目到队列里的时候。如果可选参数block为True并且timeout为None(默认),必要时会阻塞,直到有空位可用
is available. If timeout is a positive number, it blocks at most timeout seconds and raises the
Full
exception if no free slot如果timeout是正值,在timeout设定的最大阻塞时间内,没有空位可用,会阻塞并引发Full异常。
was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available,
否则(block为False),如果立即有可用的空位,则将一个条目放入队列
else raise the
Full
exception (timeout is ignored in that case).否则引发Full异常(此时忽略timeout)
Queue.
put_nowait
(item)Equivalent to
put(item, False)
.Queue.
get
(block=True, timeout=None)Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the
Empty
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).Queue.
get_nowait
()Equivalent to
get(False)
.
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Queue.
task_done
()Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each
get()
used to fetch a task, a subsequent call totask_done()
tells the queue that the processing on the task is complete.If a
join()
is currently blocking, it will resume when all items have been processed (meaning that atask_done()
call was received for every item that had beenput()
into the queue).Raises a
ValueError
if called more times than there were items placed in the queue.
Queue.
join
() block直到queue被消费完毕
下面是一些简单的示例,主要介绍queue的几种用法
import queue q = queue.Queue() #先入先出 q.put(‘生产iphone1‘) q.put(‘生产iphone2‘) q.put(‘生产iphone3‘) print(‘生产线: ‘, q.qsize()) #qsize()查看队列大小 print(q.get()) print(q.get()) print(q.get()) #再次执行print(q.get()),程序会挂起等待
运行结果
生产线: 3 生产iphone1 生产iphone2 生产iphone3
当队列被取空后,继续执行get(),程序会挂起等待。不想等待可以用get_nowait()方法
import queue q = queue.Queue() #先入先出 q.put(‘生产iphone1‘) q.put(‘生产iphone2‘) q.put(‘生产iphone3‘) print(‘生产线: ‘, q.qsize()) #qsize()查看队列大小 print(q.get()) print(q.get()) print(q.get()) #再次执行print(q.get()),程序会挂起等待 q.get_nowait()
结果
生产线: 3 Traceback (most recent call last): 生产iphone1 生产iphone2 File "E:/Python_Study/Process_Thread/Thread_ThreadsPool.py", line 18, in <module> 生产iphone3 q.get_nowait() File "D:PythonPython37libqueue.py", line 198, in get_nowait return self.get(block=False) File "D:PythonPython37libqueue.py", line 167, in get raise Empty _queue.Empty
此时会触发Empty异常,可使用try Exception处理即可。
也可以通过判断qsize()返回值,如果为0,就停止取值。还有以下方法:
Queue.
get
(block=True, timeout=None)import queue q = queue.Queue() #先入先出 q.put(‘生产iphone1‘) q.put(‘生产iphone2‘) q.put(‘生产iphone3‘) print(‘生产线: ‘, q.qsize()) #qsize()查看队列大小 print(q.get()) print(q.get()) print(q.get()) #再次执行print(q.get()),程序会挂起等待 q.get(block=False)
结果
Traceback (most recent call last): File "E:/Python_Study/Process_Thread/Thread_ThreadsPool.py", line 18, in <module> q.get(block=False) File "D:PythonPython37libqueue.py", line 167, in get raise Empty _queue.Empty 生产线: 3 生产iphone1 生产iphone2 生产iphone3
或者
q.get(timeout=1)
结果也是queue.Empty异常
import queue q = queue.Queue(maxsize=3) #先入先出 q.put(‘生产iphone1‘) q.put(‘生产iphone2‘) q.put(‘生产iphone3‘) q.put(‘生产iphone4‘) print(‘生产线: ‘, q.qsize()) #qsize()查看队列大小 print(q.get()) print(q.get()) print(q.get())
设定队列最大值,此时程序会挂起等待(等待客户端取值)
后入先出
import queue q = queue.LifoQueue() #后入先出 q.put(‘生产iphone1‘) q.put(‘生产iphone2‘) q.put(‘生产iphone3‘) print(‘生产线: ‘, q.qsize()) #qsize()查看队列大小 print(q.get()) print(q.get()) print(q.get())
结果
生产线: 3 生产iphone3 生产iphone2 生产iphone1
优先级队列
import queue q = queue.PriorityQueue() #优先级队列,entry value越小,优先级越高 #优先级队列的参数是一个元组,格式(priority number, data) q.put((15, ‘局长下发给各处处长‘)) q.put((-1, ‘省长收到中央文件‘)) q.put((3, ‘省长传达给市长‘)) q.put((20, ‘各处处长召集各处开会,传达精神‘)) q.put((9, ‘市长传达给局长‘)) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get())
结果
(-1, ‘省长收到中央文件‘) (3, ‘省长传达给市长‘) (9, ‘市长传达给局长‘) (15, ‘局长下发给各处处长‘) (20, ‘各处处长召集各处开会,传达精神‘)
自动排序,多牛,哈哈!
生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
下面是一个生产者消费者模型示例
import threading import queue import time q = queue.Queue(maxsize=10) #设定队列最大容量,先入先出 def Producer(name): count = 1 while True: #无限循环,不断生产 q.put(‘石油%s‘ % count) print(‘%s生产了桶石油%s‘ %(name, count)) count += 1 time.sleep(0.2) #生产慢 def Consumer(name): while True: print(‘%s消费了%s‘ %(name, q.get())) #无限循环,不断消费 time.sleep(0.1) #消费快 p1 = threading.Thread(target=Producer, args=(‘沙特‘, )) p2 = threading.Thread(target=Producer, args=(‘伊朗‘, )) p1.start() p2.start() c1 = threading.Thread(target=Consumer, args=(‘中国‘, )) c2 = threading.Thread(target=Consumer, args=(‘美国‘, )) c1.start() c2.start()
结果
可以看到,程序可以不断运行。上面代码是生产慢,消费快,队列不会积压。如果生产快,消费慢,队列最多积压10个,sleep()时间可自由调整。
以上是关于3.2.5 线程池的主要内容,如果未能解决你的问题,请参考以下文章