python 线程

Posted nonzero

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 线程相关的知识,希望对你有一定的参考价值。

导航 

1、Thread类
2、线程同步
3、threading.Condition
4、threading.Event
5、threading.Semaphore  信号量
6、queue模块,线程队列
7、cpu密集型与IO密集型
8、线程池

 

线程是cpu运行的最小单位,没有自己的内存空间,同一线程的多线程共享一个内存空间,同一线程下的多线程都可以访问全局变量,对全局变量进行操作时要注意同步问题。

threading模块是建立在 _thread 模块上的,对其进行封装,包含较多的功能。

1、Thread类

threading.Thread(group=None, target=None, name=None, args=(), kwargs=None,  daemon=None)

  • group                 线程组,预留的还没有什么用,应设置为None
  • target                目的方法,线程要执行的函数入口,这是直接通过Thread创建多线程需要指定的函数入口;通过继承Thread类创建的多线程,target应设置为None
  • name           线程名
  • args/kwargs      target函数的参数
  • daemon             是否设置为守护线程

1)创建多线程的方式

创建多线程主要有两种方式,①通过Thread类直接创建子线程,并指定子线程需要执行的函数逻辑。②通过继承Thread类创建实例对象,重写run方法,开启子线程

 1 from threading import Thread
 2 
 3 
 4 def fn(i):
 5     # 子线程逻辑
 6     print("--子线程--", i)
 7 
 8 
 9 if __name__ == "__main__":
10     t = Thread(target=fn, args=(100,))
11     t.start()
12     print("--主线程--")
13 
14 
15 # 输出结果
16 --子线程-- 100
17 --主线程--
Thread创建多线程
 1 from threading import Thread
 2 
 3 
 4 class MyThread(Thread):
 5     def __init__(self, name):
 6         super().__init__()
 7         self.name = name
 8 
 9     def run(self):
10         # 子线程逻辑
11         print("--子线程--", self.name)
12 
13 
14 if __name__ == "__main__":
15     t = MyThread("my_thread_01")
16     t.start()
17     print("--主线程--")
18 
19 
20 # 输出结果
21 --子线程-- my_thread_01
22 --主线程--
继承Thread创建多线程

2)下面来看一下使用多线程与不使用多线程的区别

 1 from threading import Thread
 2 import time
 3 
 4 
 5 def fn(i):
 6     # 子线程逻辑
 7     print("+++", i)
 8     time.sleep(1)
 9 
10 
11 if __name__ == "__main__":
12     start_time = time.time()
13     t_l = []
14     for i in range(5):
15         t = Thread(target=fn, args=(i,))
16         t.start()
17         t_l.append(t)
18     for i in range(5):
19         t.join()
20     print("运行所需时间:", time.time() - start_time)
21 
22 
23 # 输出结果
24 +++ 0
25 +++ 1
26 +++ 2
27 +++ 3
28 +++ 4
29 运行所需时间: 1.0024588108062744
多线程简单例子
 1 import time
 2 
 3 
 4 def fn(i):
 5     # 子线程逻辑
 6     print("+++", i)
 7     time.sleep(1)
 8 
 9 
10 if __name__ == "__main__":
11     start_time = time.time()
12     for i in range(5):
13         fn(i)
14     print("运行所需时间:", time.time() - start_time)
15 
16 
17 # 输出结果
18 +++ 0
19 +++ 1
20 +++ 2
21 +++ 3
22 +++ 4
23 运行所需时间: 5.0021538734436035
未使用多线程

可以看到同样的需要执行5次fn函数的逻辑,使用多线程的效率却比单线程的要高

3)daemon守护线程的设置

设置守护线程有有两种方法 ① t.daemon = True  直接设置属性  ②  t.setDaemon(True)  通过方法设置。

当子线程设置为守护线程是,主线程结束时程序将不再等待子线程是否执行完毕直接退出程序。如下边子线程设置为守护线程后,主线程结束了,子线程也将跟着结束,并未输出子线程的语句

 1 from threading import Thread
 2 import time
 3 
 4 
 5 def fn(i):
 6     # 子线程逻辑
 7     time.sleep(1)
 8     print("+++", i)
 9 
10 
11 if __name__ == "__main__":
12     for i in range(5):
13         t = Thread(target=fn, args=(i,))
14         t.setDaemon(True)
15         t.start()
16     print("---end---")
17 
18 
19 # 输出结果
20 ---end---
daemon设置守护线程

4)join(timeout=None)

join方法堵塞当前上下文环境的线程,直到调用join方法的子线程执行结束后当前线程才继续执行。在哪里调用就在哪堵塞,最多堵塞timeout秒

 1 from threading import Thread
 2 import time
 3 
 4 
 5 def fn():
 6     # 子线程逻辑
 7     time.sleep(5)
 8 
 9 
10 if __name__ == "__main__":
11     start_time = time.time()
12     t = Thread(target=fn)
13     t.start()
14     t.join()
15     print("所需时间-->", time.time() - start_time)
16 
17 
18 # 输出结果
19 所需时间--> 5.001126527786255
join方法

5)is_alive(),返回子线程存活状态

is_alive方法判断指定对象线程的存活状态。线程 start 后一直到该线程结束都返回True。线程 start 前 或线程已经结束返回False

 

2、线程同步

当多个线程的同时对一个变量进行操作时便会存在着安全的隐患问题,

如下:有2个线程同时对全局变量进行 + 1,000,000 为什么结果却不是 2,000,000。这是因为当线程一执行完11行时还未开始执行12行,全局变量a没有进行 +1,就已经切换了线程二执行;线程二从10行开始执行,取到的值依旧是还没有 +1 的值,执行完12行,切换回线程一;线程一继续上次的断点执行12行,这时就相当于两次线程总共才完成了一次 +1 操作。多次循环如此,那么全局变量最后的结果就不可能是2,000,000。这里为了看到明显的效果,我将 a += 1 变成了三条语句来实现

 1 from threading import Thread
 2 
 3 
 4 a = 0
 5 
 6 
 7 def fn():
 8     global a
 9     for i in range(1000000):
10         t = a
11         t += 1
12         a = t
13 
14 
15 if __name__ == "__main__":
16     t_l = []
17     for i in range(2):
18         t = Thread(target=fn)
19         t_l.append(t)
20         t.start()
21     for t in t_l:
22         t.join()
23     print("全局变量 a-->", a)
24 
25 
26 # 输出结果
27 全局变量 a--> 1332605
多线程对全局变量的修改
 1 from threading import Thread
 2 
 3 def fn():
 4     for i in range(5):
 5         print("1234")
 6 
 7 
 8 for i in range(2):
 9     t = Thread(target=fn)
10     t.start()
11 
12 
13 # 输出结果1234
14 1234
15 12341234
16 
17 1234
18 1234
19 1234
20 12341234
21 
22 1234
多线程同时对控制台打印

 

经过这个例子,就可以看出当多线程对共享的资源进行修改是就涉及到同步问题了,我们希望同一时刻只有一个线程t执行某段逻辑,其它线程必须等待这个线程t执行完才能执行。

1)threading.Lock()

Lock线程锁,在需要进行同步的语句块加上可以确保线程间的同步问题

 1 from threading import Thread, Lock
 2 
 3 
 4 def fn():
 5     global a
 6     for i in range(1000000):
 7         lock.acquire()
 8         t = a
 9         t += 1
10         a = t
11         lock.release()
12 
13 
14 a = 0
15 t_l = []
16 lock = Lock()
17 for i in range(2):
18     t = Thread(target=fn)
19     t_l.append(t)
20     t.start()
21 for t in t_l:
22     t.join()
23 print("全局变量 a-->", a)
24 
25 
26 # 全局变量
27 全局变量 a--> 2000000
Lock后

2)死锁

关于死锁的概念我就不再叙述,就简单的说说下边的代码,同时开启两个线程,线程t1中执行到8行,线程t2执行到20行的时候,t1发现锁lock2没有被释放便会在这一直等待,t2发现锁lock1没有被释放也在这一直等待,这样谁也不会先释放锁,就会造成死锁的现象,程序一直卡着。

 1 from threading import Thread, Lock
 2 import time
 3 
 4 def fn1():
 5     lock1.acquire()
 6     print("fn1-->lock1.acquire")
 7     time.sleep(1)  # 确保另一个线程已经执行fn2函数且lock2已经锁上
 8     lock2.acquire()
 9     print("fn1-->lock2.acquire")
10     lock2.release()
11     print("fn1-->lock2.release")
12     lock1.release()
13     print("fn1-->lock1.release")
14 
15 
16 def fn2():
17     lock2.acquire()
18     print("fn1-->lock2.acquire")
19     time.sleep(1)  # 确保另一个线程已经执行fn1函数且lock1已经锁上
20     lock1.acquire()
21     print("fn1-->lock1.acquire")
22     lock1.release()
23     print("fn1-->lock1.release")
24     lock2.release()
25     print("fn1-->lock2.release")
26 
27 
28 lock1 = Lock()
29 lock2 = Lock()
30 t1 = Thread(target=fn1)
31 t2 = Thread(target=fn2)
32 t1.start()
33 t2.start()
34 t1.join()
35 t2.join()
36 print("----end----")
37 
38 
39 # 输出结果
40 fn1-->lock1.acquire
41 fn1-->lock2.acquire
死锁

 

3、threading.Condition

  • acquire()                获取锁
  • release()                释放锁
  • wait(timeout=None)            等待notify的通知,没有等到则在此堵塞,并释放锁cdt.release();有notify则继续往下执行,并释放锁。timeout:堵塞最长时间秒,默认一直堵塞。使用这个方法之前要已经acquire(),因为这个方法会释放锁 
  • notify(n=1)              通知一个正在 wait 堵塞 的线程,n:通知线程数,默认为1
  • notify_all()               通知所有正在 wait 堵塞 的线程
 1 from threading import Thread, Condition
 2 import time
 3 
 4 
 5 def producer():
 6     global cdt
 7     while True:
 8         time.sleep(1)
 9         cdt.acquire()
10         print("+++++ 1个包子")
11         cdt.notify()
12         cdt.release()
13 
14 
15 def consumer():
16     global cdt
17     while True:
18         cdt.acquire()
19         cdt.wait()
20         print("----- 1个包子, 时间:", time.strftime("%X"))
21         # cdt.release()
22 
23 
24 cdt = Condition()
25 prd = Thread(target=producer)
26 csm = Thread(target=consumer)
27 prd.start()
28 csm.start()
29 prd.join()
30 csm.join()
31 print("----end----")
32 
33 
34 # 输出结果
35 +++++ 1个包子
36 ----- 1个包子, 时间: 10:05:30
37 +++++ 1个包子
38 ----- 1个包子, 时间: 10:05:31
39 +++++ 1个包子
40 ----- 1个包子, 时间: 10:05:32
41 +++++ 1个包子
42 ----- 1个包子, 时间: 10:05:33
43 +++++ 1个包子
44 ----- 1个包子, 时间: 10:05:34
Condition,1个生产者与1个消费者
 1 from threading import Thread, Condition
 2 import time
 3 
 4 
 5 def producer():
 6     global cdt
 7     while True:
 8         time.sleep(1)
 9         cdt.acquire()
10         print("+++++ 1个包子")
11         cdt.notify()
12         cdt.release()
13 
14 
15 def consumer(name):
16     global cdt
17     while True:
18         cdt.acquire()
19         cdt.wait()
20         print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X")))
21         # cdt.release()
22 
23 
24 cdt = Condition()
25 prd = Thread(target=producer)
26 prd.start()
27 for i in range(5):
28     csm = Thread(target=consumer, args=(i,))
29     csm.start()
30 
31 
32 # 输出结果
33 +++++ 1个包子
34 消费者:0----- 1个包子, 时间:10:7:07
35 +++++ 1个包子
36 消费者:1----- 1个包子, 时间:10:7:08
37 +++++ 1个包子
38 消费者:2----- 1个包子, 时间:10:7:09
39 +++++ 1个包子
40 消费者:3----- 1个包子, 时间:10:7:10
41 +++++ 1个包子
42 消费者:4----- 1个包子, 时间:10:7:11
43 +++++ 1个包子
44 消费者:0----- 1个包子, 时间:10:7:12
45 +++++ 1个包子
46 消费者:1----- 1个包子, 时间:10:7:13
47 +++++ 1个包子
48 消费者:2----- 1个包子, 时间:10:7:14
49 +++++ 1个包子
50 消费者:3----- 1个包子, 时间:10:7:15
51 +++++ 1个包子
52 消费者:4----- 1个包子, 时间:10:7:16
Condition,1个生产者与多个消费者

 

4、threading.Event

  • set()                               设置flag=True
  • clear()                            清除flag,即flag=False
  • wait(timeout=None)       堵塞线程等待set()后继续执行,timeout:堵塞最长时间秒,默认一直堵塞。

Event内部定义了一个flag,当flag=Flase时,wait()将会一直堵塞线程;flag=True时,wait()不会堵塞线程,继续向下执行,这时的wait()相当与pass语句

 1 from threading import Thread, Event
 2 import time
 3 
 4 
 5 def producer():
 6     global eve
 7     while True:
 8         time.sleep(1)
 9         print("+++++ 1个包子")
10         eve.set()
11 
12 
13 def consumer():
14     global eve
15     while True:
16         eve.wait()
17         print("----- 1个包子, 时间:%s" % (time.strftime("%X")))
18         eve.clear()
19 
20 
21 eve = Event()
22 prd = Thread(target=producer)
23 csm = Thread(target=python threading超线程使用简单范例的代码

[Python3] 043 多线程 简介

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

python中的多线程和多进程编程

常用python日期日志获取内容循环的代码片段

多线程 Thread 线程同步 synchronized