python 线程
Posted nonzero
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 线程相关的知识,希望对你有一定的参考价值。
导航
1、Thread类
2、线程同步
3、threading.Condition
4、threading.Event
5、threading.Semaphore 信号量
6、queue模块,线程队列
7、cpu密集型与IO密集型
8、线程池
线程是cpu运行的最小单位,没有自己的内存空间,同一线程的多线程共享一个内存空间,同一线程下的多线程都可以访问全局变量,对全局变量进行操作时要注意同步问题。
threading模块是建立在 _thread 模块上的,对其进行封装,包含较多的功能。
1、Thread类
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, daemon=None)
- group 线程组,预留的还没有什么用,应设置为None
- target 目的方法,线程要执行的函数入口,这是直接通过Thread创建多线程需要指定的函数入口;通过继承Thread类创建的多线程,target应设置为None
- name 线程名
- args/kwargs target函数的参数
- daemon 是否设置为守护线程
1)创建多线程的方式
创建多线程主要有两种方式,①通过Thread类直接创建子线程,并指定子线程需要执行的函数逻辑。②通过继承Thread类创建实例对象,重写run方法,开启子线程
1 from threading import Thread 2 3 4 def fn(i): 5 # 子线程逻辑 6 print("--子线程--", i) 7 8 9 if __name__ == "__main__": 10 t = Thread(target=fn, args=(100,)) 11 t.start() 12 print("--主线程--") 13 14 15 # 输出结果 16 --子线程-- 100 17 --主线程--
1 from threading import Thread 2 3 4 class MyThread(Thread): 5 def __init__(self, name): 6 super().__init__() 7 self.name = name 8 9 def run(self): 10 # 子线程逻辑 11 print("--子线程--", self.name) 12 13 14 if __name__ == "__main__": 15 t = MyThread("my_thread_01") 16 t.start() 17 print("--主线程--") 18 19 20 # 输出结果 21 --子线程-- my_thread_01 22 --主线程--
2)下面来看一下使用多线程与不使用多线程的区别
1 from threading import Thread 2 import time 3 4 5 def fn(i): 6 # 子线程逻辑 7 print("+++", i) 8 time.sleep(1) 9 10 11 if __name__ == "__main__": 12 start_time = time.time() 13 t_l = [] 14 for i in range(5): 15 t = Thread(target=fn, args=(i,)) 16 t.start() 17 t_l.append(t) 18 for i in range(5): 19 t.join() 20 print("运行所需时间:", time.time() - start_time) 21 22 23 # 输出结果 24 +++ 0 25 +++ 1 26 +++ 2 27 +++ 3 28 +++ 4 29 运行所需时间: 1.0024588108062744
1 import time 2 3 4 def fn(i): 5 # 子线程逻辑 6 print("+++", i) 7 time.sleep(1) 8 9 10 if __name__ == "__main__": 11 start_time = time.time() 12 for i in range(5): 13 fn(i) 14 print("运行所需时间:", time.time() - start_time) 15 16 17 # 输出结果 18 +++ 0 19 +++ 1 20 +++ 2 21 +++ 3 22 +++ 4 23 运行所需时间: 5.0021538734436035
可以看到同样的需要执行5次fn函数的逻辑,使用多线程的效率却比单线程的要高
3)daemon守护线程的设置
设置守护线程有有两种方法 ① t.daemon = True 直接设置属性 ② t.setDaemon(True) 通过方法设置。
当子线程设置为守护线程是,主线程结束时程序将不再等待子线程是否执行完毕直接退出程序。如下边子线程设置为守护线程后,主线程结束了,子线程也将跟着结束,并未输出子线程的语句
1 from threading import Thread 2 import time 3 4 5 def fn(i): 6 # 子线程逻辑 7 time.sleep(1) 8 print("+++", i) 9 10 11 if __name__ == "__main__": 12 for i in range(5): 13 t = Thread(target=fn, args=(i,)) 14 t.setDaemon(True) 15 t.start() 16 print("---end---") 17 18 19 # 输出结果 20 ---end---
4)join(timeout=None)
join方法堵塞当前上下文环境的线程,直到调用join方法的子线程执行结束后当前线程才继续执行。在哪里调用就在哪堵塞,最多堵塞timeout秒
1 from threading import Thread 2 import time 3 4 5 def fn(): 6 # 子线程逻辑 7 time.sleep(5) 8 9 10 if __name__ == "__main__": 11 start_time = time.time() 12 t = Thread(target=fn) 13 t.start() 14 t.join() 15 print("所需时间-->", time.time() - start_time) 16 17 18 # 输出结果 19 所需时间--> 5.001126527786255
5)is_alive(),返回子线程存活状态
is_alive方法判断指定对象线程的存活状态。线程 start 后一直到该线程结束都返回True。线程 start 前 或线程已经结束返回False
2、线程同步
当多个线程的同时对一个变量进行操作时便会存在着安全的隐患问题,
如下:有2个线程同时对全局变量进行 + 1,000,000 为什么结果却不是 2,000,000。这是因为当线程一执行完11行时还未开始执行12行,全局变量a没有进行 +1,就已经切换了线程二执行;线程二从10行开始执行,取到的值依旧是还没有 +1 的值,执行完12行,切换回线程一;线程一继续上次的断点执行12行,这时就相当于两次线程总共才完成了一次 +1 操作。多次循环如此,那么全局变量最后的结果就不可能是2,000,000。这里为了看到明显的效果,我将 a += 1 变成了三条语句来实现
1 from threading import Thread 2 3 4 a = 0 5 6 7 def fn(): 8 global a 9 for i in range(1000000): 10 t = a 11 t += 1 12 a = t 13 14 15 if __name__ == "__main__": 16 t_l = [] 17 for i in range(2): 18 t = Thread(target=fn) 19 t_l.append(t) 20 t.start() 21 for t in t_l: 22 t.join() 23 print("全局变量 a-->", a) 24 25 26 # 输出结果 27 全局变量 a--> 1332605
1 from threading import Thread 2 3 def fn(): 4 for i in range(5): 5 print("1234") 6 7 8 for i in range(2): 9 t = Thread(target=fn) 10 t.start() 11 12 13 # 输出结果1234 14 1234 15 12341234 16 17 1234 18 1234 19 1234 20 12341234 21 22 1234
经过这个例子,就可以看出当多线程对共享的资源进行修改是就涉及到同步问题了,我们希望同一时刻只有一个线程t执行某段逻辑,其它线程必须等待这个线程t执行完才能执行。
1)threading.Lock()
Lock线程锁,在需要进行同步的语句块加上可以确保线程间的同步问题
1 from threading import Thread, Lock 2 3 4 def fn(): 5 global a 6 for i in range(1000000): 7 lock.acquire() 8 t = a 9 t += 1 10 a = t 11 lock.release() 12 13 14 a = 0 15 t_l = [] 16 lock = Lock() 17 for i in range(2): 18 t = Thread(target=fn) 19 t_l.append(t) 20 t.start() 21 for t in t_l: 22 t.join() 23 print("全局变量 a-->", a) 24 25 26 # 全局变量 27 全局变量 a--> 2000000
2)死锁
关于死锁的概念我就不再叙述,就简单的说说下边的代码,同时开启两个线程,线程t1中执行到8行,线程t2执行到20行的时候,t1发现锁lock2没有被释放便会在这一直等待,t2发现锁lock1没有被释放也在这一直等待,这样谁也不会先释放锁,就会造成死锁的现象,程序一直卡着。
1 from threading import Thread, Lock 2 import time 3 4 def fn1(): 5 lock1.acquire() 6 print("fn1-->lock1.acquire") 7 time.sleep(1) # 确保另一个线程已经执行fn2函数且lock2已经锁上 8 lock2.acquire() 9 print("fn1-->lock2.acquire") 10 lock2.release() 11 print("fn1-->lock2.release") 12 lock1.release() 13 print("fn1-->lock1.release") 14 15 16 def fn2(): 17 lock2.acquire() 18 print("fn1-->lock2.acquire") 19 time.sleep(1) # 确保另一个线程已经执行fn1函数且lock1已经锁上 20 lock1.acquire() 21 print("fn1-->lock1.acquire") 22 lock1.release() 23 print("fn1-->lock1.release") 24 lock2.release() 25 print("fn1-->lock2.release") 26 27 28 lock1 = Lock() 29 lock2 = Lock() 30 t1 = Thread(target=fn1) 31 t2 = Thread(target=fn2) 32 t1.start() 33 t2.start() 34 t1.join() 35 t2.join() 36 print("----end----") 37 38 39 # 输出结果 40 fn1-->lock1.acquire 41 fn1-->lock2.acquire
3、threading.Condition
- acquire() 获取锁
- release() 释放锁
- wait(timeout=None) 等待notify的通知,没有等到则在此堵塞,并释放锁cdt.release();有notify则继续往下执行,并释放锁。timeout:堵塞最长时间秒,默认一直堵塞。使用这个方法之前要已经acquire(),因为这个方法会释放锁
- notify(n=1) 通知一个正在 wait 堵塞 的线程,n:通知线程数,默认为1
- notify_all() 通知所有正在 wait 堵塞 的线程
1 from threading import Thread, Condition 2 import time 3 4 5 def producer(): 6 global cdt 7 while True: 8 time.sleep(1) 9 cdt.acquire() 10 print("+++++ 1个包子") 11 cdt.notify() 12 cdt.release() 13 14 15 def consumer(): 16 global cdt 17 while True: 18 cdt.acquire() 19 cdt.wait() 20 print("----- 1个包子, 时间:", time.strftime("%X")) 21 # cdt.release() 22 23 24 cdt = Condition() 25 prd = Thread(target=producer) 26 csm = Thread(target=consumer) 27 prd.start() 28 csm.start() 29 prd.join() 30 csm.join() 31 print("----end----") 32 33 34 # 输出结果 35 +++++ 1个包子 36 ----- 1个包子, 时间: 10:05:30 37 +++++ 1个包子 38 ----- 1个包子, 时间: 10:05:31 39 +++++ 1个包子 40 ----- 1个包子, 时间: 10:05:32 41 +++++ 1个包子 42 ----- 1个包子, 时间: 10:05:33 43 +++++ 1个包子 44 ----- 1个包子, 时间: 10:05:34
1 from threading import Thread, Condition 2 import time 3 4 5 def producer(): 6 global cdt 7 while True: 8 time.sleep(1) 9 cdt.acquire() 10 print("+++++ 1个包子") 11 cdt.notify() 12 cdt.release() 13 14 15 def consumer(name): 16 global cdt 17 while True: 18 cdt.acquire() 19 cdt.wait() 20 print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X"))) 21 # cdt.release() 22 23 24 cdt = Condition() 25 prd = Thread(target=producer) 26 prd.start() 27 for i in range(5): 28 csm = Thread(target=consumer, args=(i,)) 29 csm.start() 30 31 32 # 输出结果 33 +++++ 1个包子 34 消费者:0----- 1个包子, 时间:10:7:07 35 +++++ 1个包子 36 消费者:1----- 1个包子, 时间:10:7:08 37 +++++ 1个包子 38 消费者:2----- 1个包子, 时间:10:7:09 39 +++++ 1个包子 40 消费者:3----- 1个包子, 时间:10:7:10 41 +++++ 1个包子 42 消费者:4----- 1个包子, 时间:10:7:11 43 +++++ 1个包子 44 消费者:0----- 1个包子, 时间:10:7:12 45 +++++ 1个包子 46 消费者:1----- 1个包子, 时间:10:7:13 47 +++++ 1个包子 48 消费者:2----- 1个包子, 时间:10:7:14 49 +++++ 1个包子 50 消费者:3----- 1个包子, 时间:10:7:15 51 +++++ 1个包子 52 消费者:4----- 1个包子, 时间:10:7:16
4、threading.Event
- set() 设置flag=True
- clear() 清除flag,即flag=False
- wait(timeout=None) 堵塞线程等待set()后继续执行,timeout:堵塞最长时间秒,默认一直堵塞。
Event内部定义了一个flag,当flag=Flase时,wait()将会一直堵塞线程;flag=True时,wait()不会堵塞线程,继续向下执行,这时的wait()相当与pass语句
1 from threading import Thread, Event 2 import time 3 4 5 def producer(): 6 global eve 7 while True: 8 time.sleep(1) 9 print("+++++ 1个包子") 10 eve.set() 11 12 13 def consumer(): 14 global eve 15 while True: 16 eve.wait() 17 print("----- 1个包子, 时间:%s" % (time.strftime("%X"))) 18 eve.clear() 19 20 21 eve = Event() 22 prd = Thread(target=producer) 23 csm = Thread(target=python threading超线程使用简单范例的代码newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段