python3多线程实战(python3经典编程案例)
Posted cui_yonghua
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python3多线程实战(python3经典编程案例)相关的知识,希望对你有一定的参考价值。
总结:
- python多线程适用在I/O密集型的任务中。对于I/O密集型任务来说,较少的时间用在cpu计算上,较多的时间用在I/O上,如文件读写,web请求,数据库请求等;
- 对于计算密集型任务,应该使用多进程。
一. 多线程任务对比
线程也是轻量级进程,是操作系统能够进行运算调度的最小单位。它被包含在进程中,是进程的实际运作单位
一个线程可以创建和撤销另一个线程,同一进程的多个线程之间可以并发执行。
线程有就绪,阻塞,运行3中基本状态。
计算密集型任务-多进程
from multiprocessing import Process
import os, time
#计算密集型任务
def work():
res = 0
for i in range(100000000):
res *= i
if __name__ == "__main__":
l = []
print("本机为",os.cpu_count(),"核 CPU") # 本机为4核
start = time.time()
for i in range(4):
p = Process(target=work) # 多进程
l.append(p)
p.start()
for p in l:
p.join()
stop = time.time()
print("计算密集型任务,多进程耗时 %s" % (stop - start))
计算密集型任务-多线程
from threading import Thread
import os, time
#计算密集型任务
def work():
res = 0
for i in range(100000000):
res *= i
if __name__ == "__main__":
l = []
print("本机为",os.cpu_count(),"核 CPU") # 本机为4核
start = time.time()
for i in range(4):
p = Thread(target=work) # 多进程
l.append(p)
p.start()
for p in l:
p.join()
stop = time.time()
print("计算密集型任务,多线程耗时 %s" % (stop - start))
I/O密集型任务-多进程
from multiprocessing import Process
import os, time
#I/0密集型任务
def work():
time.sleep(2)
print("===>", file=open("tmp.txt", "w"))
if __name__ == "__main__":
l = []
print("本机为", os.cpu_count(), "核 CPU") # 本机为4核
start = time.time()
for i in range(400):
p = Process(target=work) # 多进程
l.append(p)
p.start()
for p in l:
p.join()
stop = time.time()
print("I/0密集型任务,多进程耗时 %s" % (stop - start))
I/O密集型任务-多线程
from threading import Thread
import os, time
#I/0密集型任务
def work():
time.sleep(2)
print("===>", file=open("tmp.txt", "w"))
if __name__ == "__main__":
l = []
print("本机为", os.cpu_count(), "核 CPU") # 本机为4核
start = time.time()
for i in range(400):
p = Thread(target=work) # 多线程
l.append(p)
p.start()
for p in l:
p.join()
stop = time.time()
print("I/0密集型任务,多线程耗时 %s" % (stop - start))
结论:在python中,对于计算密集型任务,多进程占优势;对于IO密集型任务,多线程占优势。
二. threading模块
2.1 通过实例化threading.Thread类来创建线程
调用start()方法。
import time
import threading
def task_thread(counter):
print(
f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}'
)
num = counter
while num:
time.sleep(3)
num -= 1
print(
f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}'
)
if __name__ == "__main__":
print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
# 初始化3个线程,传递不同的参数
t1 = threading.Thread(target=task_thread, args=(3,))
t2 = threading.Thread(target=task_thread, args=(2,))
t3 = threading.Thread(target=task_thread, args=(1,))
# 开启三个线程
t1.start()
t2.start()
t3.start()
# 等待运行结束
t1.join()
t2.join()
t3.join()
print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
2.2 继承Thread类创建线程
在子类中重写run()
和init()
方法
import time
import threading
class MyThread(threading.Thread):
def __init__(self, counter):
super().__init__()
self.counter = counter
def run(self):
print(
f'线程名称:{threading.current_thread().name} 参数:{self.counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}'
)
counter = self.counter
while counter:
time.sleep(3)
counter -= 1
print(
f'线程名称:{threading.current_thread().name} 参数:{self.counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}'
)
if __name__ == "__main__":
print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
# 初始化3个线程,传递不同的参数
t1 = MyThread(3)
t2 = MyThread(2)
t3 = MyThread(1)
# 开启三个线程
t1.start()
t2.start()
t3.start()
# 等待运行结束
t1.join()
t2.join()
t3.join()
print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
2.3 继承Thread类调用外部传入参数
import time
import threading
def task_thread(counter):
print(f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
num = counter
while num:
time.sleep(3)
num -= 1
print(f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
class MyThread(threading.Thread):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
def run(self):
self.target(*self.args)
if __name__ == "__main__":
print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
# 初始化3个线程,传递不同的参数
t1 = MyThread(target=task_thread,args=(3,))
t2 = MyThread(target=task_thread,args=(2,))
t3 = MyThread(target=task_thread,args=(1,))
# 开启三个线程
t1.start()
t2.start()
t3.start()
# 等待运行结束
t1.join()
t2.join()
t3.join()
print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
三. 多线程同步之lock(互斥锁)
不加锁的意外情况:
3个线程对共同mun进行100万次加减操作之后,num的结果不为零
#!/usr/local/bin/python3
#-*- coding: utf-8 -*-
import time, threading
num = 0
def task_thread(n):
global num
for i in range(1000000):
num = num + n
num = num - n
t1 = threading.Thread(target=task_thread, args=(6,))
t2 = threading.Thread(target=task_thread, args=(17,))
t3 = threading.Thread(target=task_thread, args=(11,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(num)
加互斥锁之后的运行结果始终一致
import time, threading
num = 0
lock = threading.Lock()
def task_thread(n):
global num
# 获取锁,用于线程同步
lock.acquire()
for i in range(1000000):
num = num + n
num = num - n
#释放锁,开启下一个线程
lock.release()
t1 = threading.Thread(target=task_thread, args=(6,))
t2 = threading.Thread(target=task_thread, args=(17,))
t3 = threading.Thread(target=task_thread, args=(11,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(num)
四. 多线程同步之Semaphore(信号量)
互斥锁只允许一个线程访问共享数据,而信号量同时允许一定数量的线程访问共享数据。
比如柜台有5个窗口,允许同时有5个人办理业务,后面的人只能等待,5人中有人办理完业务,等待的人才能去办理。
使用信号量控制多线程并发数,代码如下:
import threading
import time
# 同时只有5个人办理业务
semaphore = threading.BoundedSemaphore(5)
# 模拟银行业务办理
def yewubanli(name):
semaphore.acquire()
time.sleep(3)
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {name} 正在办理业务")
semaphore.release()
thread_list = []
for i in range(12):
t = threading.Thread(target=yewubanli, args=(i,))
thread_list.append(t)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
# while threading.active_count() != 1:
# time.sleep(1)
可以看出,同一时刻只有5个人正在办理业务,即同一时刻只有5个线程获得资源运行。
五. 多线程同步之Condition
条件对象能让一个线程A停下来,等待其他线程B,线程B满足某个条件后通知线程B继续运行
import threading
class Boy(threading.Thread):
def __init__(self, cond, name):
super(Boy, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
print(self.name + ": 嫁给我吧!?")
self.cond.notify() # 唤醒一个挂起的线程,让hanmeimei表态
self.cond.wait() # 释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时,等待hanmeimei回答
print(self.name + ": 我单下跪,送上戒指!")
self.cond.notify()
self.cond.wait()
print(self.name + ": Li太太,你的选择太明智了。")
self.cond.release()
class Girl(threading.Thread):
def __init__(self, cond, name):
super(Girl, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
self.cond.wait() # 等待Lilei求婚
print(self.name + ": 没有情调,不够浪漫,不答应")
self.cond.notify()
self.cond.wait()
print(self.name + ": 好吧,答应你了")
self.cond.notify()
self.cond.release()
cond = threading.Condition()
boy = Boy(cond, "LiLei")
girl = Girl(cond, "HanMeiMei")
girl.start()
boy.start()
上面程序先启动了girl线程,gitl虽然获取到了条件变量锁cond, 但又执行了wait并释放条件变量锁,自身进入阻塞状态。
boy线程启动后,就获得了条件变量锁cond并发出了消息,之后通过notify唤醒一个挂起的线程。
最后通过release程序释放资源。
六. 多线程同步之Event
事件用于线程之间的通信。一个线程发出一个信号,其他一个或者多个线程等待,调用Event对象的wait方法,线程则会阻塞等待,直到别的线程set之后才会被唤醒。
import threading, time
class Boy(threading.Thread):
def __init__(self, cond, name):
super(Boy, self).__init__()
self.cond = cond
self.name = name
def run(self):
print(self.name + ": 嫁给我吧!?")
self.cond.set() # 唤醒一个挂起的线程,让hanmeimei表态
time.sleep(0.5)
self.cond.wait()
print(self.name + ": 我单下跪,送上戒指!")
self.cond.set()
time.sleep(0.5)
self.cond.wait()
self.cond.clear()
print(self.name + ": Li太太,你的选择太明智了。")
class Girl(threading.Thread):
def __init__(self, cond, name):
super(Girl, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.wait() # 等待Lilei求婚
self.cond.clear()
print(self.name + ": 没有情调,不够浪漫,不答应")
self.cond.set()
time.sleep以上是关于python3多线程实战(python3经典编程案例)的主要内容,如果未能解决你的问题,请参考以下文章