python之路 -- 并发编程之线程
Posted aberwang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python之路 -- 并发编程之线程相关的知识,希望对你有一定的参考价值。
进程 是 最小的内存分配单位
线程 是 操作系统调度的最小单位
线程直接被CPU执行,进程内至少含有一个线程,也可以开启多个线程
开启一个线程所需要的时间要远远小于开启一个进程
GIL锁(即全局解释器锁) 锁的是线程
在Cpython解释器下的python程序 在同一时刻 多个线程中只能有一个线程被CPU执行
1.创建线程的两中方式:
import time from threading import Thread def func(args): time.sleep(1) print(args) t = Thread(target=func,args=(10,)) # 传入的参数也必须以元组的形式传 t.start() # 创建线程的另一种方式(使用面向对象创建线程) import time from threading import Thread class MyTread(Thread): def __init__(self,arg): super().__init__() # 调用父类的__init__ self.arg = arg def run(self): # 方法名必须是run time.sleep(1) print(self.arg) t = MyTread(10) t.start()
2.线程与进程效率的比较
import time from threading import Thread from multiprocessing import Process def func(n): n + 1 if __name__ == ‘__main__‘: start1 = time.time() t_lst1 = [] for i in range(100): t = Thread(target=func,args=(i,)) t.start() t_lst1.append(t) for t in t_lst1:t.join() t1 = time.time() - start1 start2 = time.time() t_lst2 = [] for i in range(100): t = Process(target=func, args=(i,)) t.start() t_lst2.append(t) for t in t_lst2: t.join() t2 = time.time() - start2 print(t1,t2) 输出的结果为: 0.02698826789855957 10.160863876342773 # 可见开启一个线程所需要的时间要远远小于开启一个进程
多线程用于IO密集型,如socket,爬虫,处理web请求,读写数据库;
多进程用于计算密集型,如金融分析。
3.多进程与多线程数据共享的比较:
多个线程内部有自己的数据栈,线程内部数据不共享
全局变量在多个线程之间是共享的(线程之间资源共享)
多进程修改全局变量 import os,time from multiprocessing import Process from threading import Thread def func(): global g g -= 10 print(g,os.getpid()) g = 100 # 全局变量g t_lst = [] if __name__ == "__main__": for i in range(5): t = Process(target=func) t.start() t_lst.append(t) for t in t_lst: t.join() print(g) """输出结果为: 90 143648 90 160300 90 143316 90 160804 90 159348 100 """ 多线程修改全局变量 import os,time from multiprocessing import Process from threading import Thread for i in range(5): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst : t.join() print(g) """输出的结果为; 90 161716 80 161716 70 161716 60 161716 50 161716 50 """
4.线程模块中的其他方法:
1、Thread实例对象的方法(t为实例化的一个线程对象)
t.isAlive():返回线程是否存活。
t.getName():返回线程名。
t.setName():设置线程名。
2、threading模块提供的一些方法:
threading.active_count():查看程序中存在多少个活着的线程,与len(threading.enumerate())有相同的结果。
threading.enumerate(): 查看程序中的所有线程的线程名和线程号(列表的形式列出每个线程)。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.current_thread():查看当前线程的线程名和线程号
3.实例:
例子: import time import threading def func(n): time.sleep(0.5) print(n,t.getName(),threading.get_ident(),t.isAlive()) # 打印线程名,线程号,判断线程是否在活动 # 当一个线程执行完成func之后就结束了,t.isAlive()查看为False for i in range(5): t = threading.Thread(target=func,args=(i,)) t.start() t.join() print(threading.active_count()) # 查看程序中存在多少个活着的线程 print(threading.current_thread()) # 查看当前线程的线程名和线程号 print(threading.enumerate()) # 查看程序中的所有线程的线程名和线程号(列表的形式列出每个线程) """执行结果为: 0 Thread-1 33164 True 1 Thread-2 57836 True 2 Thread-3 165280 True 3 Thread-4 171196 True 4 Thread-5 119008 True 1 <_MainThread(MainThread, started 165276)> [<_MainThread(MainThread, started 165276)>] "
5.守护线程
守护进程与守护线程的区别:
1.守护进程随着主进程代码的执行结束而结束
2.守护线程会在主线程结束之后再等待其他子线程结束之后才结束
from threading import Thread import time def func(): print(‘我是守护线程‘) time.sleep(0.5) print(‘守护线程结束了‘) def func2(): print(‘我是线程2,非守护线程‘) time.sleep(2) print(‘func2执行完了‘) t = Thread(target=func) t.daemon = True # 仍是加在start()前 t.start() t2 = Thread(target=func2) t2.start() print(‘我是主线程‘) t2.join() print(‘主线程结束了。‘) """执行结果为: 我是守护线程 我是线程2,非守护线程 我是主线程 守护线程结束了 func2执行完了 主线程结束了。 """ # 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 再结束。然后回收子进程的资源
6.线程锁
1.不加锁
from threading import Thread import time def func(): global n time.sleep(0.5) n-=1 n = 10 t_list = [] for i in range(10): t = Thread(target=func) t.start() t_list.append(t) [t.join() for i in t_list] print(n) # 输出结果n为9 # 创建线程非常快,创建的10个线程都拿到了global的n,此时n为10,然后各线程都做-1操作,得到的结都为9
2.加锁
from threading import Thread from threading import Lock def func(lock): global n lock.acquire() n-=1 lock.release() lock = Lock() n = 10 t_list = [] for i in range(10): t = Thread(target=func,args=(lock,)) t.start() t_list.append(t) [t.join() for i in t_list] print(n) # 输出的结果为0 通过加锁后保证了多线程数据安全,但损失了效率
互斥锁Lock:在同一个线程或者进程之间,当有两个acquire的时候,就会产生阻塞(死锁)
递归锁RLock:在同一个线程或则进程之间,无论acquire多少次都不会产生阻塞(死锁)
3.多人吃面事例
多人吃面事例: # 有多个人在一个桌子上吃面,只有一份面和一个叉子,当一个人同时拿到了这两样才能完成一次吃面的动作。 from threading import Thread,Lock import time def eat1(name): noodle_lock.acquire() print(‘%s拿到面条了‘%name) fork_lock.acquire() print(‘%s拿到叉子了‘%name) print(‘%s吃面‘%name) noodle_lock.release() fork_lock.release() def eat2(name): fork_lock.acquire() print(‘%s拿到叉子了‘%name) time.sleep(1) noodle_lock.acquire() print(‘%s拿到面条啦‘%name) print(‘%s吃面‘%name) noodle_lock.release() fork_lock.release() # 实例化2个锁,给面条和叉子加上锁 noodle_lock = Lock() fork_lock = Lock() Thread(target=eat1,args=(‘peo1‘,)).start() Thread(target=eat2,args=(‘peo2‘,)).start() Thread(target=eat1,args=(‘peo3‘,)).start() Thread(target=eat2,args=(‘peo4‘,)).start() """执行结果为: peo1拿到面条了 peo1拿到叉子了 peo1吃面 peo2拿到叉子了 peo3拿到面条了 “陷入阻塞”………… """
# 加递归锁 from threading import Thread,RLock import time def eat1(name): noodle_lock.acquire() # 一把钥匙 print(‘%s拿到面条啦‘%name) fork_lock.acquire() print(‘%s拿到叉子了‘%name) print(‘%s吃面‘%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print(‘%s拿到叉子了‘%name) time.sleep(1) noodle_lock.acquire() print(‘%s拿到面条啦‘%name) print(‘%s吃面‘%name) noodle_lock.release() fork_lock.release() fork_lock = noodle_lock = RLock() Thread(target=eat1,args=(‘peo1‘,)).start() Thread(target=eat2,args=(‘peo2‘,)).start() Thread(target=eat1,args=(‘peo3‘,)).start() Thread(target=eat2,args=(‘peo4‘,)).start() # 通过加递归锁实现了数据安全 # 递归锁一般用于有多个数据需要加锁的时候,不会出现数据安全问题
对于有多个数据需要加锁的时候,用互斥锁仍然会出现数据不安全问题
当多线程是对一个数据处理的时候通过给这个数据加上互斥锁实现了数据安全。
但是当有多个数据被多线程调用处理的时候,加互斥锁仍然会出现数据安全问题。这时候需要用到递归锁,给多个数据加锁。
7.线程信号量
# 线程信号量 # 相当于加一个锁,但此锁可以根据自己需要设置有多个钥匙,此时可以允许有多个线程同时去做操作 from threading import Thread,Semaphore import time def func(sem,n): sem.acquire() time.sleep(0.5) print(n,end=‘ ‘) sem.release() sem = Semaphore(4) for i in range(10): t = Thread(target=func,args=(sem,i)) t.start() """输出的结果为: 2 3 1 0 5 4 7 6 8 9 # (4个一组基本同时输出) """
8.线程事件
# 事件被创建的时候 # False状态 # wait() 阻塞 # True状态 # wait() 非阻塞 # clear 设置状态为False # set 设置状态为True
# 起两个线程 # 第一个线程 : 连接数据库 # 等待一个信号 告诉我我们之间的网络是通的 # 连接数据库 # 第二个线程 : 检测与数据库之间的网络是否连通 # time.sleep(0,2) 2 # 将事件的状态设置为True from threading import Thread,Event import time,random def connect_db(e): count = 0 while count < 3: e.wait(0.5) # 状态为False的时候,只等待0.5秒就结束 if e.is_set() == True: print(‘连接数据库‘) break else: count += 1 print(‘第%s次连接失败‘ %count) else: raise TimeoutError(‘数据库连接超时‘) def check_web(e): time.sleep(random.randint(0, 3)) e.set() e = Event() t1 = Thread(target=connect_db, args=(e,)) t2 = Thread(target=check_web, args=(e,)) t1.start() t2.start()
9.线程条件
# notify(int数据类型) 造钥匙 from threading import Thread from threading import Condition def func(con,i): con.acquire() con.wait() # 等钥匙 print(‘在第%s个循环里‘%i) con.release() con = Condition() for i in range(10): Thread(target=func,args = (con,i)).start() while True: num = int(input(‘>>>‘)) con.acquire() con.notify(num) # 造钥匙 con.release()
输出结果:
10.线程定时器
定时器,即指定n秒后执行某操作 import time from threading import Timer def func(): print(‘时间同步‘) #1-3 while True: time.sleep(1) t = Timer(5,func).start() # 非阻塞的 # 定时等待5秒之后就调用func # time.sleep(5)
# 自动更新验证码 from threading import Thread,Timer import random class Check_number: def __init__(self): self.cash_code() # 程序一开始便实例化一个验证码 def make_code(self,n=4): res = ‘‘ for i in range(n): s1 = str(random.randint(0,9)) # 0到9间的任意自然数 s2 = chr(random.randint(65,90)) # 24个小写字母 res += random.choice([s1,s2]) # 字符和数字的任意组合 return res def cash_code(self,interval=3): self.code = self.make_code() # 实例化一个验证码 print(self.code) # 打印验证码 self.t = Timer(interval,self.make_code) # 定时器,等待指定时间再运行 self.t.start() def check(self): while True: mes = input(‘输入验证码>>>:‘).strip() if self.code == mes.upper(): print(‘输入正确!‘) self.t.cancel() # 关闭定时器 break obj = Check_number() obj.check()
11.线程队列
import queue q1 = queue.Queue() # 队列 先进先出 q2 = queue.LifoQueue() # 栈 先进后出 q3 = queue.PriorityQueue() # 优先级队列 q.put() # 往队列中放入值,当队列满的时候陷入等待状态,直到队列可以放入值的时候放值 q.get() # 从队列取值,当队列为空的时候陷入等待状态,直到队列中有值的时候取值 q.put_nowait() # 往队列中放入值,当队列满的时候,直接报错 q.get_nowait() # 从队列取值,当队列为空的时候,直接报错
优先级队列事例:
优先级队列事例: q = queue.PriorityQueue() # 优先级队列 q.put((20,‘a‘)) q.put((10,‘b‘)) q.put((-5,‘d‘)) q.put((1,‘e‘)) q.put((1,‘f‘)) print(q.get()) print(q.get()) print(q.get()) print(q.get()) """执行结果为: (-5, ‘d‘) (1, ‘e‘) (1, ‘f‘) (10, ‘b‘) """ # 此队列按照值的优先级的顺序来取值,优先级数越小越先取值, # 当优先级一样的时候,就比较值的ASCII值的大小,小的先取
12.线程池
import time from concurrent.futures import ThreadPoolExecutor def func(n): time.sleep(1) print(n,end=" ") return 2*n def call_back(m): # m接收的为func中return的值 print(‘结果是 %s‘%m.result()) # 从对象中获取值的方法.result(),进程池中是.get() tpool = ThreadPoolExecutor(max_workers=5) # 默认 不要超过cpu个数*5 for i in range(10): tpool.submit(func,i).add_done_callback(call_back) """执行结果为: 4 结果是 8 2 结果是 4 3 结果是 6 1 结果是 2 0 结果是 0 9 结果是 18 8 结果是 16 6 结果是 12 5 结果是 10 7 结果是 14 """ # tpool.map(func,range(10)) # 拿不到返回值 t_lst = [] for i in range(10): t = tpool.submit(func,i) t_lst.append(t) tpool.shutdown() # shutdown相当于close+join print(‘主线程‘) for t in t_lst:print(‘*‘,t.result(),end=‘ ‘) """执行结果为: 4 2 3 1 0 9 8 7 6 5 主线程 * 0 * 2 * 4 * 6 * 8 * 10 * 12 * 14 * 16 * 18 """
13.多线程实现socket聊天
server 端
import socket from threading import Thread def chat(sk): conn, addr = sk.accept() conn.send("hello,我是服务端".encode(‘utf-8‘)) msg = conn.recv(1024).decode(‘utf-8‘) print(msg) conn.close() sk = socket.socket() sk.bind((‘127.0.0.1‘,8080)) sk.listen() while True: t = Thread(target=chat,args=(sk,)) t.start() sk.close
client 端
# client端 import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8080)) msg = sk.recv(1024).decode(‘utf-8‘) print(msg) info = input(‘>>>‘).encode(‘utf-8‘) sk.send(info) sk.close()
以上是关于python之路 -- 并发编程之线程的主要内容,如果未能解决你的问题,请参考以下文章
python 闯关之路四(下)(并发编程与数据库编程) 并发编程重点
JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段