线程池Python 线程进程和协程
Posted chestnut-g
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池Python 线程进程和协程相关的知识,希望对你有一定的参考价值。
Python 线程 |
Threading是用于提供线程相关的操作,线程是应用程序中工作的最小单元。线程与进程的关系下图所示:
子线程是由主线程产生的,但两者并没有关联。
利用threading创建线程:
1 ‘‘‘利用threading包创建‘‘‘
2 import threading
3 import time
4
5 def run(n):
6 time.sleep(2)
7 print("task:",n)
8
9 ‘‘‘串行:一个运行完后,再运行另外一个‘‘‘
10 run("t1") #并不是线程,只是调用方法传参数
11 run("t2")
12
13 ‘‘‘并发性‘‘‘
14 t1 = threading.Thread(target=run,args=("T1",)) #t1是线程,args为元组
15 t2 = threading.Thread(target=run,args=("T2",))
16 t1.start() #并发性地工作
17 t2.start()
18
19
20 ‘‘‘运行结果‘‘‘
21 task: t1 #t1运行后会间隔两秒,然后运行t2
22 task: t2
23
24 task: T2 #T1,T2同时运行
25 task: T1
上述创建了两个线程t1和t2,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
更多方法:
- start 线程准备就绪,等待CPU调度;启动线程的活动,每个线程对象最多只能调用一次。
- join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义。
- run 表示线程活动的方法。可以在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有的话),分别使用args和kwargs参数中的顺序参数和关键字参数。线程被cpu调度后自动执行线程对象的run方法
- get_ident() 获得线程地址
- setName 为线程设置名称
- getName 获取线程名称
- daemon 一个布尔值,指示此线程是否为守护线程。这必须在调用start()之前设置,否则会引发运行时错误。它的初始值继承自创建线程;主线程不是守护进程线程,因此在主线程中创建的所有线程默认为守护进程= False。当没有存活的非守护进程线程时,整个Python程序退出。
- setDaemon 设置为后台线程或前台线程(默认)如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
1 #子线程是由主线程产生的,但两者并没有关联
2 import threading
3 import time
4
5 def run(n):
6 print("task:",n)
7 time.sleep(0.1)
8 print("taskdone:",n)
9
10 Start_time = time.time()
11 for i in range(50): #共有51个线程,代码本身是一个主线程
12 t = threading.Thread(target=run,args=("t--%s" % i,))
13 t.start()
14 t.join() #join使得主线程与子线程成串行运行
15
16 print(time.time()-Start_time) #print为创建子线程所产生的时间,而非运行时间
1 import threading
2 import time
3
4 class My_Thread(threading.Thread):
5 def __init__(self,n):
6 super(My_Thread,self).__init__()
7 self.n = n
8
9 def run(self):
10 print("task:",self.n)
11 time.sleep(0.1)
12 t_obj=[]
13 start_time = time.time()
14 for i in range(50): #共有51个线程,代码本身是一个主线程
15 t = My_Thread("t--%s" % i)
16 t.setDaemon(True) #监听端口,当主程序执行完毕,将不会执行其他线程(前提是去掉join方法)
17 t.start()
18 t_obj.append(t)
19 print(time.time()-start_time)
20
21
22 ‘‘‘运行结果‘‘‘
23 task: t--0
24 task: t--1
25 task: t--2
26 task: t--3
27 task: t--4
28 task: t--5
29 task: t--6
30 task: t--7
31 task: t--8
32 task: t--9
33 task: t--10
34 task: t--11
35 task: t--12
36 task: t--13
37 task: t--14
38 task: t--15
39 task: t--16
40 task: t--17
41 task: t--18
42 task: t--19
43 task: t--20
44 task: t--21
45 task: t--22
46 task: t--23
47 task: t--24
48 task: t--25
49 task: t--26
50 task: t--27
51 task: t--28
52 task: t--29
53 task: t--30
54 task: t--31
55 task: t--32
56 task: t--33
57 task: t--34
58 task: t--35
59 task: t--36
60 task: t--37
61 task: t--38
62 task: t--39
63 task: t--40
64 task: t--41
65 task: t--42
66 task: t--43
67 task: t--44
68 task: t--45
69 task: t--46
70 task: t--47
71 task: t--48
72 task: t--49
73 0.01196908950805664
线程锁(Lock):
1 def acquire(self, blocking=True, timeout=None): 2 """Acquire a semaphore, decrementing the internal counter by one. 3 When invoked without arguments: if the internal counter is larger than 4 zero on entry, decrement it by one and return immediately. If it is zero 5 on entry, block, waiting until some other thread has called release() to 6 make it larger than zero. This is done with proper interlocking so that 7 if multiple acquire() calls are blocked, release() will wake exactly one 8 of them up. The implementation may pick one at random, so the order in 9 which blocked threads are awakened should not be relied on. There is no 10 return value in this case. 11 When invoked with blocking set to true, do the same thing as when called 12 without arguments, and return true. 13 When invoked with blocking set to false, do not block. If a call without 14 an argument would block, return false immediately; otherwise, do the 15 same thing as when called without arguments, and return true. 16 When invoked with a timeout other than None, it will block for at 17 most timeout seconds. If acquire does not complete successfully in 18 that interval, return false. Return true otherwise. 19 """ 20 #获得一个信号量,将内部计数器减1。在没有参数的情况下调用时:如果内部计数器在入口时 21 # 大于0,则将其递减1并立即返回。如果进入时为零,阻塞,等待其他线程调用release() 22 # 使其大于零。这是通过适当的联锁完成的,这样,如果多个acquire()调用被阻塞, 23 # release()就会唤醒其中一个调用。实现可以随机选择一个线程,因此不应该依赖于被阻塞 24 # 线程被唤醒的顺序。在本例中没有返回值。当阻塞集调用为true时,执行与没有参数调用 25 # 时相同的操作,并返回true。当阻塞设置为false时,不要阻塞。如果一个没有参数的 26 # 调用将阻塞,立即返回false;否则,执行与没有参数调用时相同的操作,并返回true。 27 # 当使用除None以外的超时调用时,它最多将阻塞超时秒。如果在那段时间里收购没有成功 28 # 完成,还假。否则返回true。 29 if not blocking and timeout is not None: 30 raise ValueError("can‘t specify timeout for non-blocking acquire") 31 rc = False 32 endtime = None 33 with self._cond: 34 while self._value == 0: 35 if not blocking: 36 break 37 if timeout is not None: 38 if endtime is None: 39 endtime = _time() + timeout 40 else: 41 timeout = endtime - _time() 42 if timeout <= 0: 43 break 44 self._cond.wait(timeout) 45 else: 46 self._value -= 1 47 rc = True 48 return rc 49 50 __enter__ = acquire 51 52 def release(self): 53 """Release a semaphore, incrementing the internal counter by one. 54 When the counter is zero on entry and another thread is waiting for it 55 to become larger than zero again, wake up that thread. 56 """ 57 #释放信号量,增加一个内部计数器。当进入时计数器为零,而另一个线程正在等待计数器 58 # 再次大于零时,唤醒该线程。 59 with self._cond: 60 self._value += 1 61 self._cond.notify() 62 63 def __exit__(self, t, v, tb): 64 self.release()
1 import threading
2 import time
3
4 lock = threading.Lock() #线程锁
5
6 def run(n):
7 lock.acquire() #锁定
8 global num
9 num+=1
10 lock.release() #释放锁
11 time.sleep(1)
12
13 t_obj = []
14 num = 0
15 for i in range(50):
16 t = threading.Thread(target=run,args=("t--%s" % i,))
17 t.start()
18 t_obj.append(t)
19
20 for i in t_obj:
21 i.join()
22
23 print("num:",num)
24
25
26 ‘‘‘运行结果‘‘‘
27 num: 50
‘‘‘可用来做测试‘‘‘
if __name__ == "__main__"
#表示函数的开始位置,判断自主运行与否
线程池(信号量(semaphore)):
信号量管理一个计数器,该计数器表示release()调用的数量减去acquire()调用的数量,再加上一个初始值。acquire()方法如果有必要会阻塞,直到它可以返回而不会使计数器变为负数为止。如果未指定,值默认为1。
‘‘‘信号量‘‘‘
import threading
import time
def run(n):
Semaphore.acquire()
print("task:",n)
time.sleep(1)
Semaphore.release()
if __name__ == "__main__":
Semaphore = threading.BoundedSemaphore(5)
#每五个子进程运行一次,间隔一秒后,再运行下五个
for i in range(20):
t = threading.Thread(target=run,args=(i,))
t.start()
while threading.active_count()!=1:
pass
else:
print("--all threading has done")
1 """Thread module emulating a subset of Java‘s threading model."""
2 #线程模块模拟Java线程模型的一个子集。
3 import os as _os
4 import sys as _sys
5 import _thread
6
7 from time import monotonic as _time
8 from traceback import format_exc as _format_exc
9 from _weakrefset import WeakSet
10 from itertools import islice as _islice, count as _count
11 try:
12 from _collections import deque as _deque
13 except ImportError:
14 from collections import deque as _deque
15
16 # Note regarding PEP 8 compliant names
17 # This threading model was originally inspired by Java, and inherited
18 # the convention of camelCase function and method names from that
19 # language. Those original names are not in any imminent danger of
20 # being deprecated (even for Py3k),so this module provides them as an
21 # alias for the PEP 8 compliant names
22 # Note that using the new PEP 8 compliant names facilitates substitution
23 # with the multiprocessing module, which doesn‘t provide the old
24 # Java inspired names.
25
26 __all__ = [‘get_ident‘, ‘active_count‘, ‘Condition‘, ‘current_thread‘,
27 ‘enumerate‘, ‘main_thread‘, ‘TIMEOUT_MAX‘,
28 ‘Event‘, ‘Lock‘, ‘RLock‘, ‘Semaphore‘, ‘BoundedSemaphore‘, ‘Thread‘,
29 ‘Barrier‘, ‘BrokenBarrierError‘, ‘Timer‘, ‘ThreadError‘,
30 ‘setprofile‘, ‘settrace‘, ‘local‘, ‘stack_size‘]
31
32 # Rename some stuff so "from threading import *" is safe
33 _start_new_thread = _thread.start_new_thread
34 _allocate_lock = _thread.allocate_lock
35 _set_sentinel = _thread._set_sentinel
36 get_ident = _thread.get_ident
37 ThreadError = _thread.error
38 try:
39 _CRLock = _thread.RLock
40 except AttributeError:
41 _CRLock = None
42 TIMEOUT_MAX = _thread.TIMEOUT_MAX
43 del _thread
44
45
46 # Support for profile and trace hooks
47 #支持配置文件和跟踪挂钩
48
49 _profile_hook = None
50 _trace_hook = None
51
52 def setprofile(func):
53 """Set a profile function for all threads started from the threading module.
54 The func will be passed to sys.setprofile() for each thread, before its
55 run() method is called.
56 """
57 #为从线程模块启动的所有线程设置一个配置文件函数。在调用其run()方法之前,
58 # func将被传递给每个线程的sys.setprofile()。
59
60 global _profile_hook
61 _profile_hook = func
62
63 def settrace(func):
64 """Set a trace function for all threads started from the threading module.
65 The func will be passed to sys.settrace() for each thread, before its run()
66 method is called.
67 """
68 #为从线程模块启动的所有线程设置跟踪函数。在调用其run()方法之前,
69 # func将被传递给每个线程的sys.settrace()。
70
71 global _trace_hook
72 _trace_hook = func
73
74 # Synchronization classes
75 # 同步类
76
77 Lock = _allocate_lock
78
79 def RLock(*args, **kwargs):
80 """Factory function that returns a new reentrant lock.
81 A reentrant lock must be released by the thread that acquired it. Once a
82 thread has acquired a reentrant lock, the same thread may acquire it again
83 without blocking; the thread must release it once for each time it has
84 acquired it.
85 """
86 #返回一个新的可重入锁的工厂函数。可重入锁必须由获得它的线程释放。
87 # 一旦一个线程获得了可重入锁,该线程可以在不阻塞的情况下再次获得该锁;
88 # 线程每次获得它时都必须释放它一次。
89
90 if _CRLock is None:
91 return _PyRLock(*args, **kwargs)
92 return _CRLock(*args, **kwargs)
93
94 class _RLock:
95 """This class implements reentrant lock objects.
96 A reentrant lock must be released by the thread that acquired it. Once a
97 thread has acquired a reentrant lock, the same thread may acquire it
98 again without blocking; the thread must release it once for each time it
99 has acquired it.
100 """
101 #该类实现可重入锁对象。可重入锁必须由获得它的线程释放。一旦一个线程获得了可重入锁,
102 # 该线程可以在不阻塞的情况下再次获得该锁;线程每次获得它时都必须释放它一次。
103
104 def __init__(self):
105 self._block = _allocate_lock()
106 self._owner = None
107 self._count = 0
108
109 def __repr__(self):
110 owner = self._owner
111 try:
112 owner = _active[owner].name
113 except KeyError:
114 pass
115 return "<%s %s.%s object owner=%r count=%d at %s>" % (
116 "locked" if self._block.locked() else "unlocked",
117 self.__class__.__module__,
118 self.__class__.__qualname__,
119 owner,
120 self._count,
121 hex(id(self))
122 )
123
124 def acquire(self, blocking=True, timeout=-1):
125 """Acquire a lock, blocking or non-blocking.
126 When invoked without arguments: if this thread already owns the lock,
127 increment the recursion level by one, and return immediately. Otherwise,
128 if another thread owns the lock, block until the lock is unlocked. Once
129 the lock is unlocked (not owned by any thread), then grab ownership, set
130 the recursion level to one, and return. If more than one thread is
131 blocked waiting until the lock is unlocked, only one at a time will be
132 able to grab ownership of the lock. There is no return value in this
133 case.
134 When invoked with the blocking argument set to true, do the same thing
135 as when called without arguments, and return true.
136 When invoked with the blocking argument set to false, do not block. If a
137 call without an argument would block, return false immediately;
138 otherwise, do the same thing as when called without arguments, and
139 return true.
140 When invoked with the floating-point timeout argument set to a positive
141 value, block for at most the number of seconds specified by timeout
142 and as long as the lock cannot be acquired. Return true if the lock has
143 been acquired, false if the timeout has elapsed.
144 """
145 #获得一个锁,阻塞或非阻塞。在没有参数的情况下调用时:如果这个线程已经拥有锁,
146 # 那么将递归级别增加1,并立即返回。否则,如果另一个线程拥有锁,
147 # 则阻塞直到锁被解锁。一旦锁被解锁(不属于任何线程),然后获取所有权,
148 # 将递归级别设置为1,然后返回。如果有多个线程被阻塞,等待锁被解锁,
149 # 每次只有一个线程能够获取锁的所有权。在本例中没有返回值。当阻塞参数设置
150 # 为true时,执行与没有参数时相同的操作,并返回true。当阻塞参数设置为false时,
151 # 不要阻塞。如果一个没有参数的调用将阻塞,立即返回false;否则,执行与没有
152 # 参数调用时相同的操作,并返回true。当将浮点超时参数设置为正值时,如果获得
153 # 了锁,则最多阻塞超时指定的秒数,如果超时已过,则返回true;如果超时已过,则返回false。
154
155 me = get_ident()
156 if self._owner == me:
157 self._count += 1
158 return 1
159 rc = self._block.acquire(blocking, timeout)
160 if rc:
161 self._owner = me
162 self._count = 1
163 return rc
164
165 __enter__ = acquire
166
167 def release(self):
168 """Release a lock, decrementing the recursion level.
169 If after the decrement it is zero, reset the lock to unlocked (not owned
170 by any thread), and if any other threads are blocked waiting for the
171 lock to become unlocked, allow exactly one of them to proceed. If after
172 the decrement the recursion level is still nonzero, the lock remains
173 locked and owned by the calling thread.
174 Only call this method when the calling thread owns the lock. A
175 RuntimeError is raised if this method is called when the lock is
176 unlocked.
177 There is no return value.
178 """
179 #释放锁,降低递归级别。如果减量后为零,则将锁重置为解锁(不属于任何线程),
180 # 如果任何其他线程被阻塞,等待锁解锁,则只允许其中一个线程继续执行。如果在递减
181 # 之后递归级别仍然是非零,则锁仍然被锁定,并且由调用线程拥有。只有当调用线程拥有
182 # 锁时才调用此方法。如果在解锁锁时调用此方法,将引发运行时错误。没有返回值。
183
184 if self._owner != get_ident():
185 raise RuntimeError("cannot release un-acquired lock")
186 self._count = count = self._count - 1
187 if not count:
188 self._owner = None
189 self._block.release()
190
191 def __exit__(self, t, v, tb):
192 self.release()
193
194 # Internal methods used by condition variables
195 #条件变量使用的内部方法
196
197 def _acquire_restore(self, state):
198 self._block.acquire()
199 self._count, self._owner = state
200
201 def _release_save(self):
202 if self._count == 0:
203 raise RuntimeError("cannot release un-acquired lock")
204 count = self._count
205 self._count = 0
206 owner = self._owner
207 self._owner = None
208 self._block.release()
209 return (count, owner)
210
211 def _is_owned(self):
212 return self._owner == get_ident()
213
214 _PyRLock = _RLock
215
216
217 class Condition:
218 """Class that implements a condition variable.
219 A condition variable allows one or more threads to wait until they are
220 notified by another thread.
221 If the lock argument is given and not None, it must be a Lock or RLock
222 object, and it is used as the underlying lock. Otherwise, a new RLock object
223 is created and used as the underlying lock.
224 """
225 #实现条件变量的类。条件变量允许一个或多个线程等待,直到另一个线程通知它们。
226 # 如果锁参数是给定的而不是空的,那么它必须是一个锁或RLock对象,并且它被用作底层锁。
227 # 否则,将创建一个新的RLock对象并将其用作底层锁。
228
229 def __init__(self, lock=None):
230 if lock is None:
231 lock = RLock()
232 self._lock = lock
233 # Export the lock‘s acquire() and release() methods
234 #导出锁的acquire()和release()方法
235 self.acquire = lock.acquire
236 self.release = lock.release
237 # If the lock defines _release_save() and/or _acquire_restore(),
238 # these override the default implementations (which just call
239 # release() and acquire() on the lock). Ditto for _is_owned().
240 #如果锁定义了_release_save()和/或_acquire_restore(),就会覆盖默认的实现
241 # (它只调用release()和acquire()对锁进行访问)。_is_owned同上()。
242 try:
243 self._release_save = lock._release_save
244 except AttributeError:
245 pass
246 try:
247 self._acquire_restore = lock._acquire_restore
248 except AttributeError:
249 pass
250 try:
251 self._is_owned = lock._is_owned
252 except AttributeError:
253 pass
254 self._waiters = _deque()
255
256 def __enter__(self):
257 return self._lock.__enter__()
258
259 def __exit__(self, *args):
260 return self._lock.__exit__(*args)
261
262 def __repr__(self):
263 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
264
265 def _release_save(self):
266 self._lock.release() # No state to save 没有状态保存
267
268 def _acquire_restore(self, x):
269 self._lock.acquire() # Ignore saved state 忽略保存的状态
270
271 def _is_owned(self):
272 # Return True if lock is owned by current_thread.
273 #如果锁属于current_thread,则返回True。
274 # This method is called only if _lock doesn‘t have _is_owned().
275 #只有当_lock没有_is_owned()时才调用该方法。
276 if self._lock.acquire(0):
277 self._lock.release()
278 return False
279 else:
280 return True
281
282 def wait(self, timeout=None):
283 """Wait until notified or until a timeout occurs.
284 If the calling thread has not acquired the lock when this method is
285 called, a RuntimeError is raised.
286 This method releases the underlying lock, and then blocks until it is
287 awakened by a notify() or notify_all() call for the same condition
288 variable in another thread, or until the optional timeout occurs. Once
289 awakened or timed out, it re-acquires the lock and returns.
290 When the timeout argument is present and not None, it should be a
291 floating point number specifying a timeout for the operation in seconds
292 (or fractions thereof).
293 When the underlying lock is an RLock, it is not released using its
294 release() method, since this may not actually unlock the lock when it
295 was acquired multiple times recursively. Instead, an internal interface
296 of the RLock class is used, which really unlocks it even when it has
297 been recursively acquired several times. Another internal interface is
298 then used to restore the recursion level when the lock is reacquired.
299 """
300 #等待直到通知或超时发生。如果调用该方法时调用的线程没有获得锁,则会引发运行时错误。
301 # 该方法释放底层锁,然后阻塞,直到它被另一个线程中的notify()或notify_all()调用
302 # 唤醒,或者直到出现可选超时为止。一旦被唤醒或超时,它会重新获得锁并返回。
303 # 当出现timeout参数而不是None时,它应该是一个浮点数,以秒(或几分之一)为单位指定
304 # 操作的超时。当底层锁是RLock时,不会使用其release()方法释放它,因为当递归地多次
305 # 获取锁时,这可能不会真正解锁它。相反,使用了RLock类的内部接口,即使递归地获得了
306 # 多次,它也会真正地解锁它。然后使用另一个内部接口在重新获得锁时恢复递归级别。
307
308 if not self._is_owned():
309 raise RuntimeError("cannot wait on un-acquired lock")
310 waiter = _allocate_lock()
311 waiter.acquire()
312 self._waiters.append(waiter)
313 saved_state = self._release_save()
314 gotit = False
315 try: # restore state no matter what (e.g., KeyboardInterrupt)
316 #无论如何都要恢复状态(例如,键盘中断)
317 if timeout is None:
318 waiter.acquire()
319 gotit = True
320 else:
321 if timeout > 0:
322 gotit = waiter.acquire(True, timeout)
323 else:
324 gotit = waiter.acquire(False)
325 return gotit
326 finally:
327 self._acquire_restore(saved_state)
328 if not gotit:
329 try:
330 self._waiters.remove(waiter)
331 except ValueError:
332 pass
333
334 def wait_for(self, predicate, timeout=None):
335 """Wait until a condition evaluates to True.
336 predicate should be a callable which result will be interpreted as a
337 boolean value. A timeout may be provided giving the maximum time to
338 wait.
339 """
340 #等待,直到条件的值为True。谓词应该是可调用的,其结果将被解释为布尔值。
341 # 可能会提供一个超时,以提供最长的等待时间。
342
343 endtime = None
344 waittime = timeout
345 result = predicate()
346 while not result:
347 if waittime is not None:
348 if endtime is None:
349 endtime = _time() + waittime
350 else:
351 waittime = endtime - _time()
352 if waittime <= 0:
353 break
354 self.wait(waittime)
355 result = predicate()
356 return result
357
358 def notify(self, n=1):
359 """Wake up one or more threads waiting on this condition, if any.
360 If the calling thread has not acquired the lock when this method is
361 called, a RuntimeError is raised.
362 This method wakes up at most n of the threads waiting for the condition
363 variable; it is a no-op if no threads are waiting.
364 """
365 #唤醒在此条件下等待的一个或多个线程(如果有的话)。如果调用该方法时调用的线程没有获得锁,
366 # 则会引发运行时错误。该方法最多唤醒n个等待条件变量的线程;如果没有线程在等待,那么
367 # 这是一个no-op。
368 if not self._is_owned():
369 raise RuntimeError("cannot notify on un-acquired lock")
370 all_waiters = self._waiters
371 waiters_to_notify = _deque(_islice(all_waiters, n))
372 if not waiters_to_notify:
373 return
374 for waiter in waiters_to_notify:
375 waiter.release()
376 try:
377 all_waiters.remove(waiter)
378 except ValueError:
379 pass
380
381 def notify_all(self):
382 """Wake up all threads waiting on this condition.
383 If the calling thread has not acquired the lock when this method
384 is called, a RuntimeError is raised.
385 """
386 #唤醒在此条件下等待的所有线程。如果调用该方法时调用的线程没有获得锁,
387 # 则会引发运行时错误。
388 self.notify(len(self._waiters))
389
390 notifyAll = notify_all
391
392
393 class Semaphore:
394 """This class implements semaphore objects.
395 Semaphores manage a counter representing the number of release() calls minus
396 the number of acquire() calls, plus an initial value. The acquire() method
397 blocks if necessary until it can return without making the counter
398 negative. If not given, value defaults to 1.
399 """
400 #这个类实现信号量对象。信号量管理一个计数器,该计数器表示release()调用的数量减去
401 # acquire()调用的数量,再加上一个初始值。acquire()方法如果有必要会阻塞,直到它可以
402 # 返回而不会使计数器变为负数为止。如果未指定,值默认为1。
403
404 # After Tim Peters‘ semaphore class, but not quite the same (no maximum)
405 #在Tim Peters的信号量类之后,但不完全相同(没有最大值)
406
407 def __init__(self, value=1):
408 if value < 0:
409 raise ValueError("semaphore initial value must be >= 0")
410 self._cond = Condition(Lock())
411 self._value = value
412
413 def acquire(self, blocking=True, timeout=None):
414 """Acquire a semaphore, decrementing the internal counter by one.
415 When invoked without arguments: if the internal counter is larger than
416 zero on entry, decrement it by one and return immediately. If it is zero
417 on entry, block, waiting until some other thread has called release() to
418 make it larger than zero. This is done with proper interlocking so that
419 if multiple acquire() calls are blocked, release() will wake exactly one
420 of them up. The implementation may pick one at random, so the order in
421 which blocked threads are awakened should not be relied on. There is no
422 return value in this case.
423 When invoked with blocking set to true, do the same thing as when called
424 without arguments, and return true.
425 When invoked with blocking set to false, do not block. If a call without
426 an argument would block, return false immediately; otherwise, do the
427 same thing as when called without arguments, and return true.
428 When invoked with a timeout other than None, it will block for at
429 most timeout seconds. If acquire does not complete successfully in
430 that interval, return false. Return true otherwise.
431 """
432 #获得一个信号量,将内部计数器减1。在没有参数的情况下调用时:如果内部计数器在入口时
433 # 大于0,则将其递减1并立即返回。如果进入时为零,阻塞,等待其他线程调用release()
434 # 使其大于零。这是通过适当的联锁完成的,这样,如果多个acquire()调用被阻塞,
435 # release()就会唤醒其中一个调用。实现可以随机选择一个线程,因此不应该依赖于被阻塞
436 # 线程被唤醒的顺序。在本例中没有返回值。当阻塞集调用为true时,执行与没有参数调用
437 # 时相同的操作,并返回true。当阻塞设置为false时,不要阻塞。如果一个没有参数的
438 # 调用将阻塞,立即返回false;否则,执行与没有参数调用时相同的操作,并返回true。
439 # 当使用除None以外的超时调用时,它最多将阻塞超时秒。如果在那段时间里收购没有成功
440 # 完成,还假。否则返回true。
441 if not blocking and timeout is not None:
442 raise ValueError("can‘t specify timeout for non-blocking acquire")
443 rc = False
444 endtime = None
445 with self._cond:
446 while self._value == 0:
447 if not blocking:
448 break
449 if timeout is not None:
450 if endtime is None:
451 endtime = _time() + timeout
452 else:
453 timeout = endtime - _time()
454 if timeout <= 0:
455 break
456 self._cond.wait(timeout)
457 else:
458 self._value -= 1
459 rc = True
460 return rc
461
462 __enter__ = acquire
463
464 def release(self):
465 """Release a semaphore, incrementing the internal counter by one.
466 When the counter is zero on entry and another thread is waiting for it
467 to become larger than zero again, wake up that thread.
468 """
469 #释放信号量,增加一个内部计数器。当进入时计数器为零,而另一个线程正在等待计数器
470 # 再次大于零时,唤醒该线程。
471 with self._cond:
472 self._value += 1
473 self._cond.notify()
474
475 def __exit__(self, t, v, tb):
476 self.release()
477
478
479 class BoundedSemaphore(Semaphore):
480 """Implements a bounded semaphore.
481 A bounded semaphore checks to make sure its current value doesn‘t exceed its
482 initial value. If it does, ValueError is raised. In most situations
483 semaphores are used to guard resources with limited capacity.
484 If the semaphore is released too many times it‘s a sign of a bug. If not
485 given, value defaults to 1.
486 Like regular semaphores, bounded semaphores manage a counter representing
487 the number of release() calls minus the number of acquire() calls, plus an
488 initial value. The acquire() method blocks if necessary until it can return
489 without making the counter negative. If not given, value defaults to 1.
490 """
491 #实现有界信号量。有界信号量检查其当前值是否不超过初始值。如果是,则会引发ValueError。
492 # 在大多数情况下,信号量被用来保护有限容量的资源。如果信号量被释放了太多次,这是错误
493 # 的信号。如果未指定,值默认为1。与常规信号量一样,有界信号量管理一个计数器,
494 # 表示release()调用的数量减去acquire()调用的数量,再加上一个初始值。acquire()方法
495 # 如果有必要会阻塞,直到它可以返回而不会使计数器变为负数为止。如果未指定,值默认为1。
496
497 def __init__(self, value=1):
498 Semaphore.__init__(self, value)
499 self._initial_value = value
500
501 def release(self):
502 """Release a semaphore, incrementing the internal counter by one.
503
504 When the counter is zero on entry and another thread is waiting for it
505 to become larger than zero again, wake up that thread.
506
507 If the number of releases exceeds the number of acquires,
508 raise a ValueError.
509 """
510 #释放信号量,增加一个内部计数器。当进入时计数器为0,而另一个线程正在等待i再次
511 # 大于0时,唤醒那个线程。如果发布的数量超过了获得的数量,则引发一个ValueError。
512 with self._cond:
513 if self._value >= self._initial_value:
514 raise ValueError("Semaphore released too many times")
515 self._value += 1
516 self._cond.notify()
517
518
519 class Event:
520 """Class implementing event objects.
521
522 Events manage a flag that can be set to true with the set() method and reset
523 to false with the clear() method. The wait() method blocks until the flag is
524 true. The flag is initially false.
525 """
526 #类实现事件对象。事件管理的标志可以用set()方法设置为true,用clear()方法重置为false。
527 # wait()方法将阻塞,直到标记为true。标志最初是假的。
528
529 # After Tim Peters‘ event class (without is_posted())
530 #在Tim Peters的事件类之后(没有is_post ())
531
532 def __init__(self):
533 self._cond = Condition(Lock())
534 self._flag = False
535
536 def _reset_internal_locks(self):
537 # private! called by Thread._reset_internal_locks by _after_fork()
538 #私人!调用线程._reset_internal_locks _after_fork()
539 self._cond.__init__(Lock())
540
541 def is_set(self):
542 """Return true if and only if the internal flag is true."""
543 #当且仅当内部标志为true时返回true。
544 return self._flag
545
546 isSet = is_set
547
548 def set(self):
549 """Set the internal flag to true.
550 All threads waiting for it to become true are awakened. Threads
551 that call wait() once the flag is true will not block at all.
552 """
553 #将内部标志设置为true。等待它成真的所有线程都被唤醒。一旦标志为true,
554 # 调用wait()的线程将不会阻塞。
555 with self._cond:
556 self._flag = True
557 self._cond.notify_all()
558
559 def clear(self):
560 """Reset the internal flag to false.
561 Subsequently, threads calling wait() will block until set() is called to
562 set the internal flag to true again.
563 """
564 #将内部标志重置为false。随后,调用wait()的线程将阻塞,直到调用set()将内部标志再次设置为true。
565 with self._cond:
566 self._flag = False
567
568 def wait(self, timeout=None):
569 """Block until the internal flag is true.
570 If the internal flag is true on entry, return immediately. Otherwise,
571 block until another thread calls set() to set the flag to true, or until
572 the optional timeout occurs.
573 When the timeout argument is present and not None, it should be a
574 floating point number specifying a timeout for the operation in seconds
575 (or fractions thereof).
576 This method returns the internal flag on exit, so it will always return
577 True except if a timeout is given and the operation times out.
578 """
579 #阻塞,直到内部标志为true。如果进入时内部标志为true,则立即返回。否则,阻塞直到
580 # 另一个线程调用set()将标志设置为true,或者直到出现可选超时。当出现timeout参数
581 # 而不是None时,它应该是一个浮点数,以秒(或几分之一)为单位指定操作的超时。这个
582 # 方法在退出时返回内部标志,因此它总是返回True,除非超时和操作超时。
583 with self._cond:
584 signaled = self._flag
585 if not signaled:
586 signaled = self._cond.wait(timeout)
587 return signaled
588
589
590 # A barrier class. Inspired in part by the pthread_barrier_* api and
591 # the CyclicBarrier class from Java. See
592 ‘‘‘一个障碍类。部分灵感来自于pthread_barrier_* api和来自Java的循环屏障类。看到‘‘‘
593 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
594 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
595 # CyclicBarrier.html
596 # for information. ##获取信息
597 # We maintain two main states, ‘filling‘ and ‘draining‘ enabling the barrier
598 # to be cyclic. Threads are not allowed into it until it has fully drained
599 # since the previous cycle. In addition, a ‘resetting‘ state exists which is
600 # similar to ‘draining‘ except that threads leave with a BrokenBarrierError,
601 # and a ‘broken‘ state in which all threads get the exception.
602 ‘‘‘我们维持两种主要状态,“填充”和“排水”,使屏障是循环的。线程不允许进入它,直到它从
603 上一个循环中完全耗尽为止。此外,存在一种“重置”状态,类似于“耗尽”状态,只是线程留下了
604 一个故障的barriererror错误,以及所有线程都得到异常的“中断”状态。‘‘‘
605 class Barrier:
606 """Implements a Barrier.
607 Useful for synchronizing a fixed number of threads at known synchronization
608 points. Threads block on ‘wait()‘ and are simultaneously once they have all
609 made that call.
610 """
611 #实现了一个障碍。用于在已知同步点同步固定数量的线程。线程阻塞在‘wait()‘上,
612 # 并且一旦它们都进行了该调用,就会同时阻塞。
613
614 def __init__(self, parties, action=None, timeout=None):
615 """Create a barrier, initialised to ‘parties‘ threads.
616 ‘action‘ is a callable which, when supplied, will be called by one of
617 the threads after they have all entered the barrier and just prior to
618 releasing them all. If a ‘timeout‘ is provided, it is uses as the
619 default for all subsequent ‘wait()‘ calls.
620 """
621 #创建一个障碍,初始化为“party”线程。“action”是一个可调用的线程,当它被提供时,
622 # 它将被其中一个线程在它们全部进入壁垒并释放它们之前调用。如果提供了‘timeout‘,
623 # 那么它将用作所有后续‘wait()‘调用的默认值。
624 self._cond = Condition(Lock())
625 self._action = action
626 self._timeout = timeout
627 self._parties = parties
628 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
629 self._count = 0
630
631 def wait(self, timeout=None):
632 """Wait for the barrier.
633 When the specified number of threads have started waiting, they are all
634 simultaneously awoken. If an ‘action‘ was provided for the barrier, one
635 of the threads will have executed that callback prior to returning.
636 Returns an individual index number from 0 to ‘parties-1‘.
637 """
638 #等待障碍。当指定数量的线程开始等待时,它们都同时被唤醒。如果为barrier提供了一个
639 # “操作”,其中一个线程将在返回之前执行该回调。返回从0到“parties-1”的单个索引号。
640
641 if timeout is None:
642 timeout = self._timeout
643 with self._cond:
644 self._enter() # Block while the barrier drains. 隔离墙排水时要进行隔离。
645 index = self._count
646 self._count += 1
647 try:
648 if index + 1 == self._parties:
649 # We release the barrier
650 self._release()
651 else:
652 # We wait until someone releases us
653 self._wait(timeout)
654 return index
655 finally:
656 self._count -= 1
657 # Wake up any threads waiting for barrier to drain.
658 #唤醒任何等待屏障耗尽的线程。
659 self._exit()
660
661 # Block until the barrier is ready for us, or raise an exception
662 # if it is broken.
663 #阻止,直到障碍为我们准备好,或提出一个例外,如果它被打破。
664 def _enter(self):
665 while self._state in (-1, 1):
666 # It is draining or resetting, wait until done正在排水或重置,等待完成
667 self._cond.wait()
668 #see if the barrier is in a broken state看看势垒是否处于破碎状态
669 if self._state < 0:
670 raise BrokenBarrierError
671 assert self._state == 0
672
673 # Optionally run the ‘action‘ and release the threads waiting
674 # in the barrier.
675 #可以选择运行“action”,并释放等待在barrier中的线程。
676
677 def _release(self):
678 try:
679 if self._action:
680 self._action()
681 # enter draining state 进入排水状态
682 self._state = 1
683 self._cond.notify_all()
684 except:
685 #an exception during the _action handler. Break and reraise
686 #_action处理程序期间的异常。打破和reraise
687 self._break()
688 raise
689
690 # Wait in the barrier until we are released. Raise an exception
691 # if the barrier is reset or broken.
692 #在障碍物里等着,直到我们被释放。如果障碍被重置或破坏,则引发异常。
693 def _wait(self, timeout):
694 if not self._cond.wait_for(lambda : self._state != 0, timeout):
695 #timed out. Break the barrier
696 self._break()
697 raise BrokenBarrierError
698 if self._state < 0:
699 raise BrokenBarrierError
700 assert self._state == 1
701
702 # If we are the last thread to exit the barrier, signal any threads
703 # # waiting for the barrier to drain.
704 #如果我们是最后一个退出屏障的线程,那么向等待屏障流出的线程发出信号。
705 def _exit(self):
706 if self._count == 0:
707 if self._state in (-1, 1):
708 #resetting or draining
709 self._state = 0
710 self._cond.notify_all()
711
712 def reset(self):
713 """Reset the barrier to the initial state.
714 Any threads currently waiting will get the BrokenBarrier exception
715 raised.
716 """
717 #将势垒重置为初始状态。当前等待的任何线程都将引发故障障碍异常。
718 with self._cond:
719 if self._count > 0:
720 if self._state == 0:
721 #reset the barrier, waking up threads 重置障碍,唤醒线程
722 self._state = -1
723 elif self._state == -2:
724 #was broken, set it to reset state 被破坏,设置为重置状态
725 #which clears when the last thread exits 最后一个线程退出时哪个线程清除
726 self._state = -1
727 else:
728 self._state = 0
729 self._cond.notify_all()
730
731 def abort(self):
732 """Place the barrier into a ‘broken‘ state.
733 Useful in case of error. Any currently waiting threads and threads
734 attempting to ‘wait()‘ will have BrokenBarrierError raised.
735 """
736 #将障碍设置为“破碎”状态。在发生错误时很有用。任何当前正在等待的线程和
737 # 试图“wait()”的线程都会出现故障障碍。
738 with self._cond:
739 self._break()
740
741 def _break(self):
742 # An internal error was detected. The barrier is set to
743 # a broken state all parties awakened.
744 #检测到内部错误。障碍被设置为一个破碎的国家,所有各方都觉醒了。
745 self._state = -2
746 self._cond.notify_all()
747
748 @property
749 def parties(self):
750 """Return the number of threads required to trip the barrier."""
751 #返回跳闸所需的线程数。
752 return self._parties
753
754 @property
755 def n_waiting(self):
756 """Return the number of threads currently waiting at the barrier."""
757 #返回阻塞处当前等待的线程数。
758 # We don‘t need synchronization here since this is an ephemeral result
759 # anyway. It returns the correct value in the steady state.
760 #我们不需要同步,因为这是一个短暂的结果。它在稳定状态下返回正确的值。
761 if self._state == 0:
762 return self._count
763 return 0
764
765 @property
766 def broken(self):
767 """Return True if the barrier is in a broken state."""
768 #如果屏障处于破坏状态,返回True。
769 return self._state == -2
770
771 # exception raised by the Barrier class
772 #由Barrier类引发的异常
773 class BrokenBarrierError(RuntimeError):
774 pass
775
776
777 # Helper to generate new thread names
778 #帮助程序生成新的线程名称
779 _counter = _count().__next__
780 _counter() # Consume 0 so first non-main thread has id 1.
781 #消耗0,所以第一个非主线程id为1。
782 def _newname(template="Thread-%d"):
783 return template % _counter()
784
785 # Active thread administration #活动线程管理
786 _active_limbo_lock = _allocate_lock()
787 _active = {} # maps thread id to Thread object 将线程id映射到线程对象
788 _limbo = {}
789 _dangling = WeakSet()
790
791 # Main class for threads
792 ‘‘‘线程的主类‘‘‘
793
794 class Thread:
795 """A class that represents a thread of control.
796 This class can be safely subclassed in a limited fashion. There are two ways
797 to specify the activity: by passing a callable object to the constructor, or
798 by overriding the run() method in a subclass.
799 """
800 #表示控制线程的类。这个类可以以有限的方式安全地子类化。有两种方法可以指定活动:
801 # 通过将可调用对象传递给构造函数,或者在子类中重写run()方法。
802
803 _initialized = False
804 # Need to store a reference to sys.exc_info for printing
805 # out exceptions when a thread tries to use a global var. during interp.
806 # shutdown and thus raises an exception about trying to perform some
807 # operation on/with a NoneType
808 #需要存储对sys的引用。exc_info用于在interp期间线程试图使用全局变量时打印异常。
809 # 关闭,因此引发了一个异常,即试图对/使用非etype执行某些操作
810 _exc_info = _sys.exc_info
811 # Keep sys.exc_clear too to clear the exception just before
812 # allowing .join() to return.
813 #Keep sys.ex_clear也可以在allowing.join()返回之前清除异常。
814 #XXX __exc_clear = _sys.exc_clear
815
816 def __init__(self, group=None, target=None, name=None,
817 args=(), kwargs=None, *, daemon=None):
818 """This constructor should always be called with keyword arguments. Arguments are:
819 *group* should be None; reserved for future extension when a ThreadGroup
820 class is implemented.
821 *target* is the callable object to be invoked by the run()
822 method. Defaults to None, meaning nothing is called.
823 *name* is the thread name. By default, a unique name is constructed of
824 the form "Thread-N" where N is a small decimal number.
825 *args* is the argument tuple for the target invocation. Defaults to ().
826 *kwargs* is a dictionary of keyword arguments for the target
827 invocation. Defaults to {}.
828 If a subclass overrides the constructor, it must make sure to invoke
829 the base class constructor (Thread.__init__()) before doing anything
830 else to the thread.
831 """
832 #这个构造函数应该总是使用关键字参数调用。论点是:*group*不应该是;在实现
833 # ThreadGroup类时为将来的扩展保留。*target*是run()方法调用的可调用对象。
834 # 默认为None,表示不调用任何东西。*name*是线程名。默认情况下,唯一的名称
835 # 是由“Thread-N”的形式构造的,其中N是一个小数。*args*是目标调用的参数元组。
836 # 默认为()。*kwargs*是目标调用的关键字参数字典。默认为{}。如果子类重写构造
837 # 函数,它必须确保在对线程执行其他操作之前调用基类构造函数(thread. __init__())。
838
839 assert group is None, "group argument must be None for now"
840 if kwargs is None:
841 kwargs = {}
842 self._target = target
843 self._name = str(name or _newname())
844 self._args = args
845 self._kwargs = kwargs
846 if daemon is not None:
847 self._daemonic = daemon
848 else:
849 self._daemonic = current_thread().daemon
850 self._ident = None
851 self._tstate_lock = None
852 self._started = Event()
853 self._is_stopped = False
854 self._initialized = True
855 # sys.stderr is not stored in the class like
856 # sys.exc_info since it can be changed between instances
857 self._stderr = _sys.stderr
858 # For debugging and _after_fork()
859 _dangling.add(self)
860
861 def _reset_internal_locks(self, is_alive):
862 # private! Called by _after_fork() to reset our internal locks as
863 # they may be in an invalid state leading to a deadlock or crash.
864 #私人!由_after_fork()调用,以重置内部锁,因为它们可能处于无效状态,导致死锁或崩溃。
865 self._started._reset_internal_locks()
866 if is_alive:
867 self._set_tstate_lock()
868 else:
869 # The thread isn‘t alive after fork: it doesn‘t have a tstate anymore.
870 #在fork之后,线程不再是活的:它不再有tstate。
871 self._is_stopped = True
872 self._tstate_lock = None
873
874 def __repr__(self):
875 assert self._initialized, "Thread.__init__() was not called"
876 status = "initial"
877 if self._started.is_set():
878 status = "started"
879 self.is_alive() # easy way to get ._is_stopped set when appropriate
880 #在适当的情况下,获得._is_stopped设置的简单方法
881 if self._is_stopped:
882 status = "stopped"
883 if self._daemonic:
884 status += " daemon"
885 if self._ident is not None:
886 status += " %s" % self._ident
887 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status)
888
889 def start(self):
890 """Start the thread‘s activity.
891 It must be called at most once per thread object. It arranges for the
892 object‘s run() method to be invoked in a separate thread of control.
893 This method will raise a RuntimeError if called more than once on the
894 same thread object.
895 """
896 #启动线程的活动。每个线程对象最多只能调用一次。它安排在一个单独的控制线程中
897 # 调用对象的run()方法。如果在同一个线程对象上调用多次,此方法将引发运行时错误。
898 if not self._initialized:
899 raise RuntimeError("thread.__init__() not called")
900
901 if self._started.is_set():
902 raise RuntimeError("threads can only be started once")
903 with _active_limbo_lock:
904 _limbo[self] = self
905 try:
906 _start_new_thread(self._bootstrap, ())
907 except Exception:
908 with _active_limbo_lock:
909 del _limbo[self]
910 raise
911 self._started.wait()
912
913 def run(self):
914 """Method representing the thread‘s activity.
915 You may override this method in a subclass. The standard run() method
916 invokes the callable object passed to the object‘s constructor as the
917 target argument, if any, with sequential and keyword arguments taken
918 from the args and kwargs arguments, respectively.
919 """
920 #表示线程活动的方法。您可以在子类中重写此方法。标准run()方法调用传递给对象
921 # 构造函数的可调用对象作为目标参数(如果有的话),分别使用args和kwargs参数
922 # 中的顺序参数和关键字参数。
923 try:
924 if self._target:
925 self._target(*self._args, **self._kwargs)
926 finally:
927 # Avoid a refcycle if the thread is running a function with
928 # an argument that has a member that points to the thread.
929 #如果线程正在运行一个具有指向线程的成员的参数的函数,请避免使用refcycle。
930 del self._target, self._args, self._kwargs
931
932 def _bootstrap(self):
933 # Wrapper around the real bootstrap code that ignores
934 # exceptions during interpreter cleanup. Those typically
935 # happen when a daemon thread wakes up at an unfortunate
936 # moment, finds the world around it destroyed, and raises some
937 # random exception *** while trying to report the exception in
938 # _bootstrap_inner() below ***. Those random exceptions
939 # don‘t help anybody, and they confuse users, so we suppress
940 # them. We suppress them only when it appears that the world
941 # indeed has already been destroyed, so that exceptions in
942 # _bootstrap_inner() during normal business hours are properly
943 # reported. Also, we only suppress them for daemonic threads;
944 # if a non-daemonic encounters this, something else is wrong.
945 ‘‘‘包装真正的引导代码,在解释器清理期间忽略异常。这通常发生在守护进程线程
946 在一个不幸的时刻醒来,发现它周围的世界被破坏,并在试图报告***下面的异常
947 in_bootstrap_inner()时引发一些随机异常时。这些随机的异常对任何人都没有
948 帮助,而且它们混淆了用户,所以我们抑制了它们。只有当世界似乎确实已经被破坏
949 时,我们才会抑制它们,以便在正常工作时间内正确报告_bootstrap_inner()中
950 的异常。而且,我们只对daemonic线程禁止它们;如果一个非daemonic遇到了这个
951 问题,就会出现其他问题‘‘‘
952 try:
953 self._bootstrap_inner()
954 except:
955 if self._daemonic and _sys is None:
956 return
957 raise
958
959 def _set_ident(self):
960 self._ident = get_ident()
961
962 def _set_tstate_lock(self):
963 """
964 Set a lock object which will be released by the interpreter when
965 the underlying thread state (see pystate.h) gets deleted.
966 """
967 #设置一个锁对象,当底层线程状态(请参阅pystate.h)被删除时,解释器将释放这个锁对象。
968 self._tstate_lock = _set_sentinel()
969 self._tstate_lock.acquire()
970
971 def _bootstrap_inner(self):
972 try:
973 self._set_ident()
974 self._set_tstate_lock()
975 self._started.set()
976 with _active_limbo_lock:
977 _active[self._ident] = self
978 del _limbo[self]
979
980 if _trace_hook:
981 _sys.settrace(_trace_hook)
982 if _profile_hook:
983 _sys.setprofile(_profile_hook)
984
985 try:
986 self.run()
987 except SystemExit:
988 pass
989 except:
990 # If sys.stderr is no more (most likely from interpreter
991 # shutdown) use self._stderr. Otherwise still use sys (as in
992 # _sys) in case sys.stderr was redefined since the creation of
993 # self.
994 #如果系统。stderr不再使用self._stderr(很可能是由于解释器关闭)。否则,
995 # 在case sys中仍然使用sys(如in_sys)。stderr自自我创造以来被重新定义。
996 if _sys and _sys.stderr is not None:
997 print("Exception in thread %s:
%s" %
998 (self.name, _format_exc()), file=_sys.stderr)
999 elif self._stderr is not None:
1000 # Do the best job possible w/o a huge amt. of code to
1001 # approximate a traceback (code ideas from Lib/traceback.py)
1002 #尽最大的努力做最好的工作。近似回溯的代码(来自Lib/traceback.py的代码思想)
1003 exc_type, exc_value, exc_tb = self._exc_info()
1004 try:
1005 print((
1006 "Exception in thread " + self.name +
1007 " (most likely raised during interpreter shutdown):"), file=self._stderr)
1008 print((
1009 "Traceback (most recent call last):"), file=self._stderr)
1010 while exc_tb:
1011 print((
1012 ‘ File "%s", line %s, in %s‘ %
1013 (exc_tb.tb_frame.f_code.co_filename,
1014 exc_tb.tb_lineno,
1015 exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
1016 exc_tb = exc_tb.tb_next
1017 print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
1018 self._stderr.flush()
1019 # Make sure that exc_tb gets deleted since it is a memory
1020 # hog; deleting everything else is just for thoroughness
1021 #确保exc_tb被删除,因为它占用内存;删除所有其他内容只是为了彻底
1022 finally:
1023 del exc_type, exc_value, exc_tb
1024 finally:
1025 # Prevent a race in
1026 # test_threading.test_no_refcycle_through_target when
1027 # the exception keeps the target alive past when we
1028 # assert that it‘s dead.
1029 #防止test_threading中的竞争。test_no_refcycle_through_target,
1030 # 当异常断言目标已死时,该异常将使目标保持存活。
1031 #XXX self._exc_clear()
1032 pass
1033 finally:
1034 with _active_limbo_lock:
1035 try:
1036 # We don‘t call self._delete() because it also
1037 # grabs _active_limbo_lock.
1038 #我们不调用self._delete(),因为它也抓取_active_limbo_lock。
1039 del _active[get_ident()]
1040 except:
1041 pass
1042
1043 def _stop(self):
1044 # After calling ._stop(), .is_alive() returns False and .join() returns
1045 # immediately. ._tstate_lock must be released before calling ._stop().
1046 #调用._stop()后,.is_alive()返回False, .join()立即返回。
1047
1048 # Normal case: C code at the end of the thread‘s life
1049 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and
1050 # that‘s detected by our ._wait_for_tstate_lock(), called by .join()
1051 # and .is_alive(). Any number of threads _may_ call ._stop()
1052 # simultaneously (for example, if multiple threads are blocked in
1053 # .join() calls), and they‘re not serialized. That‘s harmless -
1054 # they‘ll just make redundant rebindings of ._is_stopped and
1055 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the
1056 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works
1057 # (the assert is executed only if ._tstate_lock is None).
1058 #正常情况:线程生命周期结束时的C代码(_threadmodule.c中的release_sentinel)
1059 # 释放了._tstate_lock,我们的._wait_for_tstate_lock()检测到这一点,
1060 # 它被.join()和.is_alive()调用。同时调用任意数量的线程_may_ ._stop()
1061 # (例如,如果多个线程在.join()调用中被阻塞,并且它们没有被序列化)。这是无害的,
1062 # 他们只会对._is_stopped和._tstate_lock进行冗余的重绑定。晦涩的:
1063 # 我们将._tstate_lock绑定到最后,以便“断言self”。_is_stopped()中
1064 # 的._wait_for_tstate_lock()总是有效的(只有当._tstate_lock为空时才执行断言)。
1065
1066 # Special case: _main_thread releases ._tstate_lock via this
1067 # module‘s _shutdown() function.
1068 #特殊情况:_main_thread通过这个模块的_shutdown()函数释放._tstate_lock。
1069 lock = self._tstate_lock
1070 if lock is not None:
1071 assert not lock.locked()
1072 self._is_stopped = True
1073 self._tstate_lock = None
1074
1075 def _delete(self):
1076 "Remove current thread from the dict of currently running threads."
1077 with _active_limbo_lock:
1078 del _active[get_ident()]
1079 # There must not be any python code between the previous line
1080 # and after the lock is released. Otherwise a tracing function
1081 # could try to acquire the lock again in the same thread, (in
1082 # current_thread()), and would block.
1083 #前一行和锁释放后之间不应该有任何python代码。否则,跟踪函数可以尝试在相
1084 # 同的线程(在current_thread()中)中再次获取锁,并将阻塞。
1085
1086 def join(self, timeout=None):
1087 """Wait until the thread terminates.
1088 This blocks the calling thread until the thread whose join() method is
1089 called terminates -- either normally or through an unhandled exception
1090 or until the optional timeout occurs.
1091 When the timeout argument is present and not None, it should be a
1092 floating point number specifying a timeout for the operation in seconds
1093 (or fractions thereof). As join() always returns None, you must call
1094 isAlive() after join() to decide whether a timeout happened -- if the
1095 thread is still alive, the join() call timed out.
1096 When the timeout argument is not present or None, the operation will
1097 block until the thread terminates.
1098 A thread can be join()ed many times.
1099 join() raises a RuntimeError if an attempt is made to join the current
1100 thread as that would cause a deadlock. It is also an error to join() a
1101 thread before it has been started and attempts to do so raises the same
1102 exception.
1103 """
1104 #等待直到线程终止。这将阻塞调用线程,直到调用join()方法的线程终止——通常或通过
1105 # 未处理的异常终止,或直到出现可选超时为止。当出现timeout参数而不是None时,
1106 # 它应该是一个浮点数,以秒(或几分之一)为单位指定操作的超时。因为join()总是
1107 # 返回None,所以必须在join()之后调用isAlive(),以决定是否发生超时——如果线程
1108 # 仍然活着,则join()调用超时。当timeout参数不存在或不存在时,操作将阻塞,
1109 # 直到线程终止。一个线程可以多次连接()ed。如果尝试连接当前线程,join()将引发
1110 # 一个运行时错误,因为这会导致死锁。在线程启动之前连接()线程也是一个错误,
1111 # 试图这样做会引发相同的异常。
1112 if not self._initialized:
1113 raise RuntimeError("Thread.__init__() not called")
1114 if not self._started.is_set():
1115 raise RuntimeError("cannot join thread before it is started")
1116 if self is current_thread():
1117 raise RuntimeError("cannot join current thread")
1118
1119 if timeout is None:
1120 self._wait_for_tstate_lock()
1121 else:
1122 # the behavior of a negative timeout isn‘t documented, but
1123 # historically .join(timeout=x) for x<0 has acted as if timeout=0
1124 #没有记录消极超时的行为,但是在历史上,x<0时的.join(timeout=x)就像timeout=0一样
1125 self._wait_for_tstate_lock(timeout=max(timeout, 0))
1126
1127 def _wait_for_tstate_lock(self, block=True, timeout=-1):
1128 # Issue #18808: wait for the thread state to be gone.
1129 # At the end of the thread‘s life, after all knowledge of the thread
1130 # is removed from C data structures, C code releases our _tstate_lock.
1131 # This method passes its arguments to _tstate_lock.acquire().
1132 # If the lock is acquired, the C code is done, and self._stop() is
1133 # called. That sets ._is_stopped to True, and ._tstate_lock to None.
1134 #问题#18808:等待线程状态消失。在线程生命周期结束时,在从C数据结构中删除所有
1135 # 线程知识之后,C代码释放我们的_tstate_lock。该方法将其参数
1136 # 传递给_tstate_lock.acquire()。如果获得了锁,则完成C代码,
1137 # 并调用self._stop()。这将._is_stopped设置为True,._tstate_lock设置为None。
1138 lock = self._tstate_lock
1139 if lock is None: # already determined that the C code is done 已经确定C代码已经完成
1140 assert self._is_stopped
1141 elif lock.acquire(block, timeout):
1142 lock.release()
1143 self._stop()
1144
1145 @property
1146 def name(self):
1147 """A string used for identification purposes only.
1148 It has no semantics. Multiple threads may be given the same name. The
1149 initial name is set by the constructor.
1150 """
1151 #仅用于识别目的的字符串。它没有语义。多个线程可能被赋予相同的名称。初始名称由构造函数设置。
1152 assert self._initialized, "Thread.__init__() not called"
1153 return self._name
1154
1155 @name.setter
1156 def name(self, name):
1157 assert self._initialized, "Thread.__init__() not called"
1158 self._name = str(name)
1159
1160 @property
1161 def ident(self):
1162 """Thread identifier of this thread or None if it has not been started.
1163 This is a nonzero integer. See the get_ident() function. Thread
1164 identifiers may be recycled when a thread exits and another thread is
1165 created. The identifier is available even after the thread has exited.
1166 """
1167 #此线程的线程标识符,如果没有启动,则为空。这是非零整数。请参阅get_ident()函数。
1168 # 当线程退出并创建另一个线程时,可以回收线程标识符。即使线程已经退出,标识符也是可用的。
1169 assert self._initialized, "Thread.__init__() not called"
1170 return self._ident
1171
1172 def is_alive(self):
1173 """Return whether the thread is alive.
1174 This method returns True just before the run() method starts until just
1175 after the run() method terminates. The module function enumerate()
1176 returns a list of all alive threads.
1177 """
1178 #返回线程是否存在。这个方法在run()方法开始之前返回True,直到run()方法终止之后。
1179 # 模块函数enumerate()返回一个包含所有活线程的列表。
1180 assert self._initialized, "Thread.__init__() not called"
1181 if self._is_stopped or not self._started.is_set():
1182 return False
1183 self._wait_for_tstate_lock(False)
1184 return not self._is_stopped
1185
1186 isAlive = is_alive
1187
1188 @property
1189 def daemon(self):
1190 """A boolean value indicating whether this thread is a daemon thread.
1191 This must be set before start() is called, otherwise RuntimeError is
1192 raised. Its initial value is inherited from the creating thread; the
1193 main thread is not a daemon thread and therefore all threads created in
1194 the main thread default to daemon = False.
1195 The entire Python program exits when no alive non-daemon threads are
1196 left.
1197 """
1198 #一个布尔值,指示此线程是否为守护线程。这必须在调用start()之前设置,否则会引发
1199 # 运行时错误。它的初始值继承自创建线程;主线程不是守护进程线程,因此在主线程中
1200 # 创建的所有线程默认为守护进程= False。当没有存活的非守护进程线程时,
1201 # 整个Python程序退出。
1202 assert self._initialized, "Thread.__init__() not called"
1203 return self._daemonic
1204
1205 @daemon.setter
1206 def daemon(self, daemonic):
1207 if not self._initialized:
1208 raise RuntimeError("Thread.__init__() not called")
1209 if self._started.is_set():
1210 raise RuntimeError("cannot set daemon status of active thread")
1211 self._daemonic = daemonic
1212
1213 def isDaemon(self): #Daemon:守护进程
1214 return self.daemon
1215
1216 def setDaemon(self, daemonic):
1217 self.daemon = daemonic
1218
1219 def getName(self):
1220 return self.name
1221
1222 def setName(self, name):
1223 self.name = name
1224
1225 # The timer class was contributed by Itamar Shtull-Trauring
1226 #计时器类由Itamar Shtull-Trauring贡献
1227
1228 class Timer(Thread):
1229 """Call a function after a specified number of seconds:
1230 t = Timer(30.0, f, args=None, kwargs=None)
1231 t.start()
1232 t.cancel() # stop the timer‘s action if it‘s still waiting
1233 """
1234 #在指定的秒数后调用一个函数:t = Timer(30.0, f, args=None, kwargs=None)
1235 #t.start() t.cancel()如果计时器仍在等待,则停止计时器的操作
1236
1237 def __init__(self, interval, function, args=None, kwargs=None):
1238 Thread.__init__(self)
1239 self.interval = interval
1240 self.function = function
1241 self.args = args if args is not None else []
1242 self.kwargs = kwargs if kwargs is not None else {}
1243 self.finished = Event()
1244
1245 def cancel(self):
1246 """Stop the timer if it hasn‘t finished yet."""
1247 #如果计时器还没有结束,请停止。
1248 self.finished.set()
1249
1250 def run(self):
1251 self.finished.wait(self.interval)
1252 if not self.finished.is_set():
1253 self.function(*self.args, **self.kwargs)
1254 self.finished.set()
1255
1256
1257 # Special thread class to represent the main thread
1258 ‘‘‘表示主线程的特殊线程类‘‘‘
1259
1260 class _MainThread(Thread):
1261
1262 def __init__(self):
1263 Thread.__init__(self, name="MainThread", daemon=False)
1264 self._set_tstate_lock()
1265 self._started.set()
1266 self._set_ident()
1267 with _active_limbo_lock:
1268 _active[self._ident] = self
1269
1270
1271 # Dummy thread class to represent threads not started here.
1272 # These aren‘t garbage collected when they die, nor can they be waited for.
1273 # If they invoke anything in threading.py that calls current_thread(), they
1274 # leave an entry in the _active dict forever after.
1275 # Their purpose is to return *something* from current_thread().
1276 # They are marked as daemon threads so we won‘t wait for them
1277 # when we exit (conform previous semantics).
1278 #伪线程类来表示这里没有启动的线程。它们死后不会被垃圾收集,也不会被等待。如果它们在
1279 # 线程中调用任何东西。调用current_thread()的py在_active dict中永远留下一个条目。
1280 # 它们的目的是从current_thread()返回*something*。它们被标记为守护线程,因此在退出
1281 # 时我们不会等待它们(符合前面的语义)。
1282
1283 class _DummyThread(Thread):
1284
1285 def __init__(self):
1286 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True)
1287
1288 self._started.set()
1289 self._set_ident()
1290 with _active_limbo_lock:
1291 _active[self._ident] = self
1292
1293 def _stop(self):
1294 pass
1295
1296 def is_alive(self):
1297 assert not self._is_stopped and self._started.is_set()
1298 return True
1299
1300 def join(self, timeout=None):
1301 assert False, "cannot join a dummy thread"
1302
1303
1304 # Global API functions
1305 #全球API函数
1306
1307 def current_thread():
1308 """Return the current Thread object, corresponding to the caller‘s thread of control.
1309 If the caller‘s thread of control was not created through the threading
1310 module, a dummy thread object with limited functionality is returned.
1311 """
1312 #返回当前线程对象,对应于调用方的控制线程。如果没有通过线程模块创建调用者的控制线
1313 # 程,则返回具有有限功能的虚拟线程对象。
1314 try:
1315 return _active[get_ident()]
1316 except KeyError:
1317 return _DummyThread()
1318
1319 currentThread = current_thread
1320
1321 def active_count():
1322 """Return the number of Thread objects currently alive.
1323 The returned count is equal to the length of the list returned by
1324 enumerate().
1325 """
1326 #返回当前存活的线程对象的数量。返回的计数等于enumerate()返回的列表的长度。
1327 with _active_limbo_lock:
1328 return len(_active) + len(_limbo)
1329
1330 activeCount = active_count
1331
1332 def _enumerate():
1333 # Same as enumerate(), but without the lock. Internal use only.
1334 #与enumerate()相同,只是没有锁。内部使用。
1335 return list(_active.values()) + list(_limbo.values())
1336
1337 def enumerate():
1338 """Return a list of all Thread objects currently alive.
1339 The list includes daemonic threads, dummy thread objects created by
1340 current_thread(), and the main thread. It excludes terminated threads and
1341 threads that have not yet been started.
1342 """
1343 #返回当前所有线程对象的列表。该列表包括daemonic线程、current_thread()创建的虚拟
1344 # 线程对象和主线程。它排除终止的线程和尚未启动的线程。
1345 with _active_limbo_lock:
1346 return list(_active.values()) + list(_limbo.values())
1347
1348 from _thread import stack_size
1349
1350 # Create the main thread object,
1351 # and make it available for the interpreter
1352 # (Py_Main) as threading._shutdown.
1353 #创建主线程对象,并将其作为thread ._shutdown提供给解释器(Py_Main)。
1354
1355 _main_thread = _MainThread()
1356
1357 def _shutdown():
1358 # Obscure: other threads may be waiting to join _main_thread. That‘s
1359 # dubious, but some code does it. We can‘t wait for C code to release
1360 # the main thread‘s tstate_lock - that won‘t happen until the interpreter
1361 # is nearly dead. So we release it here. Note that just calling _stop()
1362 # isn‘t enough: other threads may already be waiting on _tstate_lock.
1363 #晦涩:其他线程可能正在等待加入_main_thread。这很可疑,但有些代码可以做到。
1364 # 我们不能等待C代码释放主线程的tstate_lock——这要等到解释器快死的时候才会发生。
1365 # 我们在这里释放它。注意,仅仅调用_stop()是不够的:其他线程可能已经在
1366 # 等待_tstate_lock了。
1367 if _main_thread._is_stopped:
1368 # _shutdown() was already called
1369 return
1370 tlock = _main_thread._tstate_lock
1371 # The main thread isn‘t finished yet, so its thread state lock can‘t have
1372 # been released.
1373 #主线程尚未完成,因此它的线程状态锁无法释放。
1374 assert tlock is not None
1375 assert tlock.locked()
1376 tlock.release()
1377 _main_thread._stop()
1378 t = _pickSomeNonDaemonThread()
1379 while t:
1380 t.join()
1381 t = _pickSomeNonDaemonThread()
1382
1383 def _pickSomeNonDaemonThread():
1384 for t in enumerate():
1385 if not t.daemon and t.is_alive():
1386 return t
1387 return None
1388
1389 def main_thread():
1390 """Return the main thread object.
1391 In normal conditions, the main thread is the thread from which the
1392 Python interpreter was started.
1393 """
1394 #返回主线程对象。在正常情况下,主线程是Python解释器启动的线程。
1395 return _main_thread
1396
1397 # get thread-local implementation, either from the thread
1398 # module, or from the python fallback
1399 #从线程模块或python回退中获取线程本地实现
1400
1401 try:
1402 from _thread import _local as local
1403 except ImportError:
1404 from _threading_local import local
1405
1406
1407 def _after_fork():
1408 """
1409 Cleanup threading module state that should not exist after a fork.
1410 """
1411 # Reset _active_limbo_lock, in case we forked while the lock was held
1412 # by another (non-forked) thread. http://bugs.python.org/issue874900
1413 #Reset _active_limbo_lock,以防我们分叉而锁被另一个(非分叉的)线程持有。
1414 global _active_limbo_lock, _main_thread
1415 _active_limbo_lock = _allocate_lock()
1416
1417 # fork() only copied the current thread; clear references to others.
1418 #fork()只复制当前线程;明确提及他人。
1419 new_active = {}
1420 current = current_thread()
1421 _main_thread = current
1422 with _active_limbo_lock:
1423 # Dangling thread instances must still have their locks reset,
1424 # because someone may join() them.
1425 #悬空线程实例必须重新设置它们的锁,因为有人可能会加入()它们。
1426 threads = set(_enumerate())
1427 threads.update(_dangling)
1428 for thread in threads:
1429 # Any lock/condition variable may be currently locked or in an
1430 # invalid state, so we reinitialize them.
1431 #任何锁/条件变量可能当前被锁定或处于无效状态,因此我们重新初始化它们。
1432 if thread is current:
1433 # There is only one active thread. We reset the ident to
1434 # its new value since it can have changed.
1435 #只有一个活动线程。我们将ident重置为它的新值,因为它可能已经更改。
1436 thread._reset_internal_locks(True)
1437 ident = get_ident()
1438 thread._ident = ident
1439 new_active[ident] = thread
1440 else:
1441 # All the others are already stopped.
1442 thread._reset_internal_locks(False)
1443 thread._stop()
1444
1445 _limbo.clear()
1446 _active.clear()
1447 _active.update(new_active)
1448 assert len(_active) == 1
1449
1450
1451 if hasattr(_os, "register_at_fork"):
1452 _os.register_at_fork(after_in_child=_after_fork)
队列:
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
1 ‘‘‘队列‘‘‘
2 import queue
3 q =queue.Queue() #设置队列
4 q.put("q1") #队列中放入数据
5 q.put("q2")
6 q.put("q3")
7
8 # print(q.qsize()) #获取队列大小
9
10 ‘‘‘队列中获取数据,取出的数据超出存入数据时会等待,不会报错‘‘‘
11 print(q.get())
12 print(q.get())
13 print(q.get())
14 # print(q.get())
15
16 ‘‘‘获取队列,但不会等待,超出后直接报错‘‘‘
17 print(q.get_nowait())
18 print(q.get_nowait())
19 print(q.get_nowait())
20 # print(q.get_nowait())
21
22 ‘‘‘设置优先级排序的依据‘‘‘
23 q = queue.PriorityQueue(maxsize=0)
24 q.put((3,"q1")) #当maxsizie<=0时,队列无限大,>0时,给定数据即为队列大小
25 q.put((1,"q2"))
26 q.put((-4,"q3"))
27 print(q.get()) #获取时会从小到大按顺序获取
28 print(q.get())
29 print(q.get())
上述代码只是队列的应用,下面将队列应用与线程之中:
1 import queue
2 import time
3 import threading
4
5 q = queue.Queue(maxsize=10)
6 def gave(name):
7 count = 1
8 while True:
9 q.put("--骨头--%s" % count)
10 print("%s 生产骨头 %s" % (name,count))
11 time.sleep(1)
12 count+=1
13
14 def consumer(name):
15 while q.qsize()>0:
16 # while True:
17 print("%s 吃掉 %s" % (name,q.get()))
18 # time.sleep(10)
19
20 g = threading.Thread(target=gave,args=("王二小",))
21 c = threading.Thread(target=consumer,args=("旺财",))
22 g.start()
23 c.start()
1 #print(‘ 33[41;1m--red light on--- 33[0m‘) #红灯
2 #print(‘ 33[43;1m--yellow light on--- 33[0m‘) #黄灯
3 #print(‘ 33[42;1m--green light on--- 33[0m‘) #绿灯
4 ‘‘‘主要用在数据同步上‘‘‘
5 ‘‘‘红绿灯事件‘‘‘
6
7 # import threading
8 # import time
9 # # import queue
10 # event = threading.Event()
11 # # q = queue.Queue()
12 #
13 # def light():
14 # count = 1
15 # while True:
16 # if count<=5:
17 # event.set()
18 # print(‘ 33[42;1m--green light on--- 33[0m‘)
19 # elif 5<count<=10:
20 # event.clear()
21 # print(‘ 33[43;1m--yellow light on--- 33[0m‘)
22 # else:
23 # print(‘ 33[41;1m--red light on--- 33[0m‘)
24 # if count>=15:
25 # count = 0
26 # time.sleep(1)
27 # count+=1
28 #
29 # def car(name):
30 # while True:
31 # if event.is_set():
32 # time.sleep(1)
33 # print("%s is running..." % name)
34 # else:
35 # print("car is waiting...")
36 # event.wait() #等待事件event对象发生变化
37 #
38 #
39 # Light = threading.Thread(target=light,)
40 # Light.start()
41 # Car = threading.Thread(target=car,args=("BENZ",))
42 # Car.start()
43
44
45 import threading
46 import time
47 import queue
48
49 event=threading.Event()
50 q=queue.PriorityQueue(maxsize=20)
51 #在循环之前先放入十辆车:
52 for i in range(10):
53 q.put("旧车辆,%s" % "QQ")
54
55 def light():
56 count=0
57 while True:
58 if count<10:
59 event.set()
60 print("