python多线程和多进程的实现
Posted sysu_lluozh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python多线程和多进程的实现相关的知识,希望对你有一定的参考价值。
一、什么是线程和进程
对于操作系统而言,一个任务就是一个进程(Process)
线程是操作系统能够进行运算调度的最小单位(程序执行流的最小单元),它被包含在进程之中,是进程中的实际运作单位。一个进程中可以并发多个线程
每条线程并行执行不同的任务(线程是进程中的一个实体,是被系统独立调度和分派的基本单元)
每一个进程启动时都会最先产生一个线程,即主线程,然后主线程会再创建其他的子线程
单个CPU可以执行多进程和多线程,即由操作系统在多个进程或者线程之间快速切换,使得每个进程或者线程短暂的交替运行,真正实现多线程需要多核 CPU 才可能实现
当要执行多个任务的时候,可以采用多进程、多线程、多进程+多线程的模式来实现
但是多个任务间可能有某种关联,需要相互通信和协调,比方我要完成任务1和任务2,才能开始做任务3和任务4
二、实现多线程
Python 的标准库提供了两个模块:_thread
和**threading
,_thread
是低级模块threading
是高级模块,对_**thread**
进行封装。通常,只需要使用 threading
这个高级模块**
2.1 threading实现线程操作
- 添加线程
- 导入模块:
import threading
- 获取已激活的线程数
threading.active_count()
- 查看现在正在运行的线程
threading.current_thread()
- 添加线程
thread = threading.Thread(target=thread_job,)
注意:接收参数target
代表这个线程要完成的任务,需自行定义
- 一段完成的小代码
import threading
def t_job():
print('current_thread: %s' % threading.current_thread())
def main():
thread = threading.Thread(target=t_job,)
thread.start()
if __name__ == '__main__':
main()
# 输出
current_thread: <Thread(Thread-8, started 15240)>
- 控制线程
- 线程开始运行
thread.start()
- 控制多个线程的执行顺序
thread.join()
为什么要控制多个线程的执行顺序?
假设t1_job和t2_job两个任务,第一个任务执行时间10s,第二个任务执行时间1s,创建代码:
import time
import threading
def t1_job():
print("T1 start\\n")
for i in range(10):
time.sleep(1)
print("T1 finish\\n")
def t2_job():
print("t2 start\\n")
for i in range(10):
time.sleep(0.1)
print("t2 finish\\n")
thread_1 = threading.Thread(target=t1_job, name='t1')
thread_2 = threading.Thread(target=t2_job, name='t2')
thread_1.start()
thread_2.start()
print("all jobs finished\\n")
希望得到的输出是:
T1 start
T2 start
T2 finish
T1 finish
all jobs finished
然而实际的输出是:
T1 start
T2 start
T2 finish
all jobs finished
T1 finish
这种杂乱的执行方式并不是所需要的,因此要使用**join()
加以控制,推荐将每个线程对的join()依次放在所有start()**后面
可将代码修改如下:
import time
import threading
def t1_job():
print("T1 start\\n")
for i in range(10):
time.sleep(1)
print("T1 finish\\n")
def t2_job():
print("t2 start\\n")
for i in range(10):
time.sleep(0.1)
print("t2 finish\\n")
thread_1 = threading.Thread(target=t1_job, name='t1')
thread_2 = threading.Thread(target=t2_job, name='t2')
thread_1.join()
thread_2.join()
print("all jobs finished\\n")
2.2 多线程应用实例
事件驱动生产者和消费者
import threading
import time
def Producer():
print 'chef:等人来买包子'
# 收到了消费者的event.set 也就是把这个flag改为了true,但是我们的包子并没有做好
event.wait()
# 此时应该将flag的值改回去
event.clear()
print 'chef:someone is coming for 包子'
print 'chef:making a 包子 for someone'
time.sleep(5)
# 告诉人家包子做好了
print '你的包子好了~'
event.set()
def Consumer():
print 'tom:去买包子'
# 告诉人家我来了
event.set()
time.sleep(2)
print 'tom:waiting for 包子 to be ready'
# 我在不断检测,但我已经不阻塞了
while True:
if event.is_set():
print 'Thanks~'
break
else:
print '怎么还没好呀~'
# 模拟正在做自己的事情
time.sleep(1)
event = threading.Event()
p1 = threading.Thread(target=Producer)
c1 = threading.Thread(target=Consumer)
p1.start()
c1.start()
输出:
chef:等人来买包子
tom:去买包子
chef:someone is coming for 包子
chef:making a 包子 for someone
tom:waiting for 包子 to be ready
怎么还没好呀~
怎么还没好呀~
怎么还没好呀~
你的包子好了~
Thanks~
2.3 线程锁lock的操作
在多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量导致内容错乱,举个栗子:
import time, threading
c = 0
def t_job(n):
global c
c += n
c -= n
def t_run(n):
for i in range(1000):
t_job(n)
t1 = threading.Thread(target=t_run, args=(5,))
t2 = threading.Thread(target=t_run, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(c)
注意:args 后面传到的参数需要加个逗号
定义了一个共享变量**c
,初始值为0
,并且启动两个线程,先存后取,理论上结果应该为0
,但是,由于线程的调度是由操作系统决定的,当t1**、t2交替执行时,只要循环次数足够多,**c
**的结果就不一定是0
因为当 CPU 运行时候:
c += n
相当于:
tmp = c + n
c = tmp
线程交替运行时:
初始值 c = 0
t1: tmp1 = c + 5 # tmp1 = 0 + 5 = 5
t2: tmp2 = c + 8 # tmp2 = 0 + 8 = 8
t2: c = tmp2 # c = 8
t1: c = x1 # c = 5
t1: tmp1 = c - 5 # tmp1 = 5 - 5 = 0
t1: c = tmp1 # c = 0
t2: tmp2 = c - 8 # tmp2 = 0 - 8 = -8
t2: c = tmp2 # c = -8
结果 c = -8
如果要确保c计算正确,就要给t_job()上一把锁,当某个线程开始执行 t_job()时,该线程因为获得了锁,因此其他线程不能同时执行t_job(),只能等待,直到锁被释放后,获得该锁以后才能改
由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以不会造成修改的冲突
- 创建一个锁
threading.Lock()
c = 0
lock = threading.Lock()
def t_run(n):
for i in range(10000):
lock.acquire()
try:
t_job(n)
finally:
lock.release()
当多个线程同时执行**lock.acquire**()
时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止
注意:获得锁的线程用完后一定要释放锁,否则其他等待锁的线程将成为死线程。所以**try...finally
**来确保锁一定会被释放
2.4 多线程锁应用实例
# -*-* encoding:UTF-8 -*-
import threading
import time
list = [0,0,0,0,0,0,0,0,0,0,0,0]
class myThread(threading.Thread):
def __init__(self,threadId,name,counter):
threading.Thread.__init__(self)
self.threadId = threadId
self.name = name
self.counter = counter
def run(self):
print "开始线程:",self.name
# 获得锁,成功获得锁定后返回 True
# 可选的timeout参数不填时将一直阻塞直到获得锁定
# 否则超时后将返回 False
threadLock.acquire()
print_time(self.name,self.counter,list.__len__())
# 释放锁
threadLock.release()
def __del__(self):
print self.name,"线程结束!"
def print_time(threadName,delay,counter):
while counter:
time.sleep(delay)
list[counter-1] += 1
print "[%s] %s 修改第 %d 个值,修改后值为:%d" % (time.ctime(time.time()),threadName,counter,list[counter-1])
counter -= 1
threadLock = threading.Lock()
threads = []
# 创建新线程
thread1 = myThread(1,"Thread-1",1)
thread2 = myThread(2,"Thread-2",2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print "主进程结束!"
不加锁时
同样是上面实例的代码,注释以下两行代码:
threadLock.acquire()
threadLock.release()
锁的应用使得单线程能从头到尾不受干扰进行,但是也阻碍了多线程的进行。包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降
另外,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止
2.5 GIL 锁
Python 的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何 Python 线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行
这个 GIL 全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在 Python 中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核
三、实现多进程
3.1 os.fork
# -*- coding:utf-8 -*-
"""
pid=os.fork()
1.只用在Unix系统中有效,Windows系统中无效
2.fork函数调用一次,返回两次:在父进程中返回值为子进程id,在子进程中返回值为0
"""
import os
pid=os.fork()
if pid==0:
print("执行子进程,子进程pid=pid,父进程ppid=ppid".format(pid=os.getpid(),ppid=os.getppid()))
else:
print("执行父进程,子进程pid=pid,父进程ppid=ppid".format(pid=pid,ppid=os.getpid()))
3.2 multiprocessing模块
- 创建Process实例
使用multiprocessing模块,创建Process的实例,传入任务执行函数作为参数
# -*- coding:utf-8 -*-
"""
Process常用属性与方法:
name:进程名
pid:进程id
run(),自定义子类时覆写
start(),开启进程
join(timeout=None),阻塞进程
terminate(),终止进程
is_alive(),判断进程是否存活
"""
import os,time
from multiprocessing import Process
def worker():
print("子进程执行中>>> pid=0,ppid=1".format(os.getpid(),os.getppid()))
time.sleep(2)
print("子进程终止>>> pid=0".format(os.getpid()))
def main():
print("主进程执行中>>> pid=0".format(os.getpid()))
ps=[]
# 创建子进程实例
for i in range(2):
p=Process(target=worker,name="worker"+str(i),args=())
ps.append(p)
# 开启进程
for i in range(2):
ps[i].start()
# 阻塞进程
for i in range(2):
ps[i].join()
print("主进程终止")
if __name__ == '__main__':
main()
- 派生Process子类并重写run方法
使用multiprocessing模块,派生Process的子类,重写run方法
# -*- coding:utf-8 -*-
import os,time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self):
Process.__init__(self)
def run(self):
print("子进程开始>>> pid=0,ppid=1".format(os.getpid(),os.getppid()))
time.sleep(2)
print("子进程终止>>> pid=".format(os.getpid()))
def main():
print("主进程开始>>> pid=".format(os.getpid()))
myp=MyProcess()
myp.start()
# myp.join()
print("主进程终止")
if __name__ == '__main__':
main()
3.3 使用进程池Pool
# -*- coding:utf-8 -*-
import os,time
from multiprocessing import Pool
def worker(arg):
print("子进程开始执行>>> pid=,ppid=,编号".format(os.getpid(),os.getppid(),arg))
time.sleep(0.5)
print("子进程终止>>> pid=,ppid=,编号".format(os.getpid(),os.getppid(),arg))
def main():
print("主进程开始执行>>> pid=".format(os.getpid()))
ps=Pool(5)
for i in range(10):
# ps.apply(worker,args=(i,)) # 同步执行
ps.apply_async(worker,args=(i,)) # 异步执行
# 关闭进程池,停止接受其它进程
ps.close()
# 阻塞进程
ps.join()
print("主进程终止")
if __name__ == '__main__':
main()
以上是关于python多线程和多进程的实现的主要内容,如果未能解决你的问题,请参考以下文章