队列queue 多应用在多线程中,对于多线程访问共享变量时,队列queue是线程安全的。
?self.not empty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,唤醒等待require互斥锁后,读取队列。
import Queue
q = Queue.Queue(maxsize = 5) #设置队列长度为5,当有大于5个的数据put进队列时,将阻塞(等待其它线程取走数据,将继续执行)。
q.put(5) #put()方法在队尾插入一个元素。
q.get() #get()方法从队头删除并返回一个元素。
(1),Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
(2),LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
(3),优先级队列,优先级越低(数字越小)越先出来。 class queue.PriorityQueue(maxsize)
while True:
★常用方法(queue = Queue.Queue()):
queue.qsize() 返回队列的大小
queue.empty() 如果队列为空,返回True,反之False
queue.full() 如果队列满了,返回True,反之False
queue.full 与 maxsize 大小对应
queue.get(self, block=True, timeout=None) 获取队列中的一个元素,timeout等待时间
queue.get_nowait() 相当q.get(False);无阻塞的向队列中get任务,当队列为空时,不等待,而是直接抛出empty异常。
queue.put(self, item, block=True, timeout=None) 写入队列,timeout等待时间
queue.put_nowait(item) 相当q.put(item, False);无阻塞的向队列中添加任务,当队列为满时,不等待,而是直接抛出full异常.
queue.task_done() 完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
queue.join() 阻塞等待队列中任务全部处理完毕,再执行别的操作。
(1)queue.put(self, item, block=True, timeout=None)函数:
(2)queue.get(self, block=True, timeout=None)函数:

1 class Queue: 2 """Create a queue object with a given maximum size. 3 4 If maxsize is <= 0, the queue size is infinite. 5 """ 6 def __init__(self, maxsize=0): 7 self.maxsize = maxsize 8 self._init(maxsize) 9 # mutex must be held whenever the queue is mutating. All methods 10 # that acquire mutex must release it before returning. mutex 11 # is shared between the three conditions, so acquiring and 12 # releasing the conditions also acquires and releases mutex. 13 self.mutex = _threading.Lock() 14 # Notify not_empty whenever an item is added to the queue; a 15 # thread waiting to get is notified then. 16 self.not_empty = _threading.Condition(self.mutex) 17 # Notify not_full whenever an item is removed from the queue; 18 # a thread waiting to put is notified then. 19 self.not_full = _threading.Condition(self.mutex) 20 # Notify all_tasks_done whenever the number of unfinished tasks 21 # drops to zero; thread waiting to join() is notified to resume 22 self.all_tasks_done = _threading.Condition(self.mutex) 23 self.unfinished_tasks = 0 24 25 def task_done(self): 26 """Indicate that a formerly enqueued task is complete. 27 28 Used by Queue consumer threads. For each get() used to fetch a task, 29 a subsequent call to task_done() tells the queue that the processing 30 on the task is complete. 31 32 If a join() is currently blocking, it will resume when all items 33 have been processed (meaning that a task_done() call was received 34 for every item that had been put() into the queue). 35 36 Raises a ValueError if called more times than there were items 37 placed in the queue. 38 """ 39 self.all_tasks_done.acquire() 40 try: 41 unfinished = self.unfinished_tasks - 1 42 if unfinished <= 0: 43 if unfinished < 0: 44 raise ValueError(‘task_done() called too many times‘) 45 self.all_tasks_done.notify_all() 46 self.unfinished_tasks = unfinished 47 finally: 48 self.all_tasks_done.release() 49 50 def join(self): 51 """Blocks until all items in the Queue have been gotten and processed. 52 53 The count of unfinished tasks goes up whenever an item is added to the 54 queue. The count goes down whenever a consumer thread calls task_done() 55 to indicate the item was retrieved and all work on it is complete. 56 57 When the count of unfinished tasks drops to zero, join() unblocks. 58 """ 59 self.all_tasks_done.acquire() 60 try: 61 while self.unfinished_tasks: 62 self.all_tasks_done.wait() 63 finally: 64 self.all_tasks_done.release() 65 66 def qsize(self): 67 """Return the approximate size of the queue (not reliable!).""" 68 self.mutex.acquire() 69 n = self._qsize() 70 self.mutex.release() 71 return n 72 73 def empty(self): 74 """Return True if the queue is empty, False otherwise (not reliable!).""" 75 self.mutex.acquire() 76 n = not self._qsize() 77 self.mutex.release() 78 return n 79 80 def full(self): 81 """Return True if the queue is full, False otherwise (not reliable!).""" 82 self.mutex.acquire() 83 n = 0 < self.maxsize == self._qsize() 84 self.mutex.release() 85 return n 86 87 def put(self, item, block=True, timeout=None): 88 """Put an item into the queue. 89 90 If optional args ‘block‘ is true and ‘timeout‘ is None (the default), 91 block if necessary until a free slot is available. If ‘timeout‘ is 92 a non-negative number, it blocks at most ‘timeout‘ seconds and raises 93 the Full exception if no free slot was available within that time. 94 Otherwise (‘block‘ is false), put an item on the queue if a free slot 95 is immediately available, else raise the Full exception (‘timeout‘ 96 is ignored in that case). 97 """ 98 self.not_full.acquire() 99 try: 100 if self.maxsize > 0: 101 if not block: 102 if self._qsize() == self.maxsize: 103 raise Full 104 elif timeout is None: 105 while self._qsize() == self.maxsize: 106 self.not_full.wait() 107 elif timeout < 0: 108 raise ValueError("‘timeout‘ must be a non-negative number") 109 else: 110 endtime = _time() + timeout 111 while self._qsize() == self.maxsize: 112 remaining = endtime - _time() 113 if remaining <= 0.0: 114 raise Full 115 self.not_full.wait(remaining) 116 self._put(item) 117 self.unfinished_tasks += 1 118 self.not_empty.notify() 119 finally: 120 self.not_full.release() 121 122 def put_nowait(self, item): 123 """Put an item into the queue without blocking. 124 125 Only enqueue the item if a free slot is immediately available. 126 Otherwise raise the Full exception. 127 """ 128 return self.put(item, False) 129 130 def get(self, block=True, timeout=None): 131 """Remove and return an item from the queue. 132 133 If optional args ‘block‘ is true and ‘timeout‘ is None (the default), 134 block if necessary until an item is available. If ‘timeout‘ is 135 a non-negative number, it blocks at most ‘timeout‘ seconds and raises 136 the Empty exception if no item was available within that time. 137 Otherwise (‘block‘ is false), return an item if one is immediately 138 available, else raise the Empty exception (‘timeout‘ is ignored 139 in that case). 140 """ 141 self.not_empty.acquire() 142 try: 143 if not block: 144 if not self._qsize(): 145 raise Empty 146 elif timeout is None: 147 while not self._qsize(): 148 self.not_empty.wait() 149 elif timeout < 0: 150 raise ValueError("‘timeout‘ must be a non-negative number") 151 else: 152 endtime = _time() + timeout 153 while not self._qsize(): 154 remaining = endtime - _time() 155 if remaining <= 0.0: 156 raise Empty 157 self.not_empty.wait(remaining) 158 item = self._get() 159 self.not_full.notify() 160 return item 161 finally: 162 self.not_empty.release() 163 164 def get_nowait(self): 165 """Remove and return an item from the queue without blocking. 166 167 Only get an item if one is immediately available. Otherwise 168 raise the Empty exception. 169 """ 170 return self.get(False) 171 172 # Override these methods to implement other queue organizations 173 # (e.g. stack or priority queue). 174 # These will only be called with appropriate locks held 175 176 # Initialize the queue representation 177 def _init(self, maxsize): 178 self.queue = deque() 179 180 def _qsize(self, len=len): 181 return len(self.queue) 182 183 # Put a new item in the queue 184 def _put(self, item): 185 self.queue.append(item) 186 187 # Get an item from the queue 188 def _get(self): 189 return self.queue.popleft()