Python学习记录-多进程和多线程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python学习记录-多进程和多线程相关的知识,希望对你有一定的参考价值。
Python学习记录-多进程和多线程
[TOC]
1. 进程和线程
进程
狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
线程
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
线程与进程比较
线程与进程的区别:
1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
3)创建:创建新线程很简单,创建新进程需要对父进程进行一次克隆。
4)调度和切换:一个线程可以控制和操作同一进程里的其它线程,但是进程只能操作子进程;线程上下文切换比进程上下文切换要快得多。
5)在多线程OS中,进程不是一个可执行的实体。
<font color="red">注意:</font>
线程和进程快慢无法对比,因为线程被包含在进程中。
2. threading模块
线程有2种调用方式,如下:
直接调用
import threading
import time
def sayhi(num): # 定义每个线程要运行的函数
print("running on number:%s" % num)
time.sleep(3)
if __name__ == ‘__main__‘:
t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例
t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一个线程实例
t1.start() # 启动线程
t2.start() # 启动另一个线程
print(t1.getName()) # 获取线程名
print(t2.getName())
继承式调用
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self): # 定义每个线程要运行的函数
print("running on number:%s" % self.num)
time.sleep(3)
if __name__ == ‘__main__‘:
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
平常用到的主要方法:
start
线程准备就绪,等待CPU调度setName
为线程设置名称getName
获取线程名称setDaemon
设置为后台线程或前台线程(默认)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止join
逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义run
线程被cpu调度后自动执行线程对象的run方法
2.1 Join & Daemon
import time
import threading
def run(n):
print(‘[%s]------running----\n‘ % n)
time.sleep(2)
print(‘--done--‘)
def main():
for i in range(5):
t = threading.Thread(target=run, args=[i, ])
t.start()
t.join(1)
print(‘starting thread‘, t.getName())
m = threading.Thread(target=main, args=[])
m.setDaemon(True) # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(timeout=2)
print("---main thread done----")
import time
import threading
def addNum():
global num # 在每个线程中都获取这个全局变量
print(‘--get num:‘, num)
time.sleep(1)
num -= 1 # 对此公共变量进行-1操作
num = 100 # 设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有线程执行完毕
t.join()
print(‘final num:‘, num)
2.2 线程锁(互斥锁Mutex)
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。
import time
import threading
def addNum():
global num # 在每个线程中都获取这个全局变量
print(‘--get num:‘, num)
time.sleep(1)
num -= 1 # 对此公共变量进行-1操作
num = 100 # 设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有线程执行完毕
t.join()
print(‘final num:‘, num)
因为python2.7以上版本,已经自动添加线程锁,所以我们不用细纠。
import time
import threading
def addNum():
global num # 在每个线程中都获取这个全局变量
print(‘--get num:‘, num)
time.sleep(1)
lock.acquire() # 修改数据前加锁
num -= 1 # 对此公共变量进行-1操作
lock.release() # 修改后释放
num = 100 # 设定一个共享变量
thread_list = []
lock = threading.Lock() # 生成全局锁
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有线程执行完毕
t.join()
print(‘final num:‘, num)
2.3 信号量(Semaphore)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
import threading, time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s" % n)
semaphore.release()
if __name__ == ‘__main__‘:
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start()
2.4 事件(event)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set
、wait
、clear
。
事件处理的机制:全局定义了一个“Flag”
,如果“Flag”
值为 False
,那么当程序执行 event.wait
方法时就会阻塞,如果“Flag”
值为True
,那么event.wait
方法时便不再阻塞。
clear
:将“Flag”
设置为False
set
:将“Flag”
设置为True
import threading
def do(event):
print(‘start‘)
event.wait()
print(‘execute‘)
event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start()
event_obj.clear()
inp = input(‘input:‘)
if inp == ‘true‘:
event_obj.set()
2.5 条件(Condition)
使得线程等待,只有满足某条件时,才释放n个线程。
未使用条件时:
import threading
def run(n):
con.acquire()
con.wait()
print("run the thread: %s" %n)
con.release()
if __name__ == ‘__main__‘:
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input(‘>>>‘)
if inp == ‘q‘:
break
con.acquire()
con.notify(int(inp))
con.release()
使用条件时:
def condition_func():
ret = False
inp = input(‘>>>‘)
if inp == ‘1‘:
ret = True
return ret
def run(n):
con.acquire()
con.wait_for(condition_func)
print("run the thread: %s" %n)
con.release()
if __name__ == ‘__main__‘:
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
2.6 定时器(Timer)
定时器,指定n秒后执行某操作
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed
3. 进程
from multiprocessing import Process
import threading
import time
def foo(i):
time.sleep(1)
print(‘say hi‘,i)
for i in range(10):
p = Process(target=foo,args=(i,))
p.start()
<font color="red">注意</font>:
由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。
3.1 进程数据共享
进程各自持有一份数据,默认无法共享数据。
from multiprocessing import Process
import time
li = []
def foo(i):
li.append(i)
print(‘say hi‘,li)
for i in range(10):
p = Process(target=foo,args=(i,))
p.start()
print (‘ending‘,li)
结果类似这样(每次的结果可能排序会有变化)。
say hi [2]
say hi [3]
say hi [5]
say hi [0]
say hi [1]
say hi [4]
say hi [6]
say hi [7]
say hi [8]
ending []
say hi [9]
想要进程间共享数据
方法一:使用Array
from multiprocessing import Process,Array
temp = Array(‘i‘, [11,22,33,44])
def Foo(i):
temp[i] = 100+i
for item in temp:
print(i,‘----->‘,item)
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
方法二:manage.dict()共享数据
from multiprocessing import Process,Manager
manage = Manager()
dic = manage.dict()
def Foo(i):
dic[i] = 100+i
print dic.values()
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
p.join()
3.2 进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
- apply
- apply_async
from multiprocessing import Process,Pool
import time
def Foo(i):
time.sleep(2)
return i+100
def Bar(arg):
print(arg)
pool = Pool(5)
print(pool.apply(Foo,(1,)))
print(pool.apply_async(func =Foo, args=(1,)).get())
for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
print(‘end‘)
pool.close()
pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
4. 协程
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
4.1 greenlet
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
运行结果:
12
56
34
78
4.2 gevent
Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
import gevent
def foo():
print(‘Running in foo‘)
gevent.sleep(0)
print(‘Explicit context switch to foo again‘)
def bar():
print(‘Explicit context to bar‘)
gevent.sleep(0)
print(‘Implicit context switch back to bar‘)
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
运行结果:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
遇到IO操作自动切换:
from gevent import monkey; monkey.patch_all()
import gevent
import urllib.request
def f(url):
print(‘GET: %s‘ % url)
resp = urllib.request.urlopen(url)
data = resp.read()
print(‘%d bytes received from %s.‘ % (len(data), url))
gevent.joinall([
gevent.spawn(f, ‘https://www.python.org/‘),
gevent.spawn(f, ‘https://www.baidu.com/‘),
gevent.spawn(f, ‘https://www.so.com/‘),
])
以上是关于Python学习记录-多进程和多线程的主要内容,如果未能解决你的问题,请参考以下文章