九进程与线程
Posted liqiongming
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了九进程与线程相关的知识,希望对你有一定的参考价值。
进程:一个任务就是一个进程(Process)
线程:进程内的“子任务”称为线程(Thread)
线程是最小的执行单元,而进程由至少一个线程组成。多进程和多线程的程序涉及到同步、数据共享的问题
一、多进程
- fork():调用一次,返回两次,把当前进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回,子进程永远返回
0
,父进程返回子进程的ID。 子进程需要调用 getppid() 拿到父进程的ID。(window下无法调用该函数)1 import os 2 3 print(‘Process (%s) start...‘ % os.getpid()) 4 # Only works on Unix/Linux/Mac: 5 pid = os.fork() 6 if pid == 0: 7 print(‘I am child process (%s) and my parent is %s.‘ % (os.getpid(), os.getppid())) 8 else: 9 print(‘I (%s) just created a child process (%s).‘ % (os.getpid(), pid)) 10 11 输出: 12 Process (876) start... 13 I (876) just created a child process (877). 14 I am child process (877) and my parent is 876.
- multiprocessing:跨平台版本的多进程模块,提供一个 Process 类来代表一个进程对象
1 from multiprocessing import Process 2 import os 3 #子进程要执行的代码 4 def run_proc(name): 5 print(‘运行子进程中,名字:%s,进程ID:%s...‘ %(name,os.getpid())) 6 7 if __name__ == ‘__main__‘: 8 print(‘父进程ID:%s.‘%os.getpid()) 9 p = Process(target=run_proc, args=(‘test‘,)) #创建一个Process实例;args为元组; 10 print(‘开始执行子进程.‘) 11 p.start() 12 p.join() #等待子进程结束后再继续往下运行,通常用于进程间的同步; 13 print(‘结束子进程.‘) 14 print(os.getpid()) 15 print(os.getppid()) 16 17 输出: 18 父进程ID:1368. 19 开始执行子进程. 20 运行子进程中,名字:test,进程ID:8692... 21 结束子进程. 22 1368 23 3324
- Pool:进程池,批量创建子进程, Pool对象 调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close() ,调用 close() 之后就不能继续添加新的 Process 了,
Pool
的默认大小是CPU的核数。1 def apply(self, func, args=(), kwds={})函数,相当于‘func(* args,** kwds)’
1 from multiprocessing import Pool 2 import os,time,random 3 4 def long_time_task(name): 5 print(‘运行任务%s,ID:%s‘ % (name,os.getpid())) 6 7 start = time.time() #开始运行:0秒开始 8 time.sleep(random.random()* 3) #取一个0.0 - 1.0间的随机浮点数乘以3倍,来作为进程在此停留或者运行的时间 9 end = time.time() #经过time.sleep后的时间,相当于end-start = time.sleep(random.random()* 3) 的时间 10 11 print(‘任务%s运行%0.2f秒。‘%(name,(end-start))) 12 13 if __name__ == ‘__main__‘: 14 print(‘父进程ID:%s‘ % os.getpid()) 15 p = Pool(4) # Pool的值在本机默认值为4,表示最多同时执行4个进程,后面的需等前面某个进程完成时才能继续执行; 16 for i in range(5): 17 # apply_async会阻塞进程直到返回数据才把进程释放,然后有空余的进程来执行下一个进程(不等于挂起进程) 18 p.apply_async(long_time_task,args=(i,)) 19 print(‘等待所有子进程完成...‘) 20 p.close() 21 p.join() 22 print(‘所有子进程完成。‘) 23 24 输出: 25 父进程ID:5048 26 等待所有子进程完成... 27 运行任务0,ID:6840 28 运行任务1,ID:5232 29 运行任务2,ID:4884 30 运行任务3,ID:1900 31 任务2运行0.24秒。 32 运行任务4,ID:4884 33 任务1运行1.10秒。 34 任务4运行0.97秒。 35 任务0运行2.08秒。 36 任务3运行2.13秒。 37 所有子进程完成。
- 子进程
1 import subprocess 2 3 print(‘$ nslookup www.python.org‘) 4 r = subprocess.call([‘nslookup‘, ‘www.python.org‘]) 5 print(‘Exit code:‘, r) 6 7 输出: 8 $ nslookup www.python.org 9 Server: 192.168.19.4 10 Address: 192.168.19.4#53 11 12 Non-authoritative answer: 13 www.python.org canonical name = python.map.fastly.net. 14 Name: python.map.fastly.net 15 Address: 199.27.79.223 16 17 Exit code: 0
通过 communicate() 方法输入对子进程进行输入操作,父进程如果想要和子进程通过 communicate() 方法通信, subprocess.Popen() 里对应的参数必须是 subprocess.PIPE ,如果为默认值 None ,那么子进程使用和父进程相同的标准流文件
下文相当于在命令行执行命令
nslookup
,然后手动输入:1 set q=mx 2 python.org 3 exit
1 import subprocess 2 3 print(‘$ nslookup‘) 4 p = subprocess.Popen([‘nslookup‘], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) #默认为:=None 5 output, err = p.communicate(b‘set q=mx python.org exit ‘) 6 print(output.decode(‘utf-8‘)) 7 print(‘Exit code:‘, p.returncode) 8 9 输出: 10 $ nslookup 11 Server: 192.168.19.4 12 Address: 192.168.19.4#53 13 14 Non-authoritative answer: 15 python.org mail exchanger = 50 mail.python.org. 16 17 Authoritative answers can be found from: 18 mail.python.org internet address = 82.94.164.166 19 mail.python.org has AAAA address 2001:888:2000:d::a6 20 21 22 Exit code: 0
subprocess.Popen():创建并返回一个子进程,并在这个进程中执行指定的程序
subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
- args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串;
- bufsize:控制 stdin, stdout, stderr 等参数指定的文件的缓冲,和打开文件的 open()函数中的参数 bufsize 含义相同;
- executable:如果这个参数不是 None,将替代参数 args 作为可执行程序;
- stdin:指定子进程的标准输入;
- stdout:指定子进程的标准输出;
- stderr:指定子进程的标准错误输出;
- preexec_fn:默认是None,否则必须是一个函数或者可调用对象,在子进程中首先执行这个函数,然后再去执行为子进程指定的程序或Shell。
- close_fds:布尔型变量,为 True 时,在子进程执行前强制关闭所有除 stdin,stdout和stderr外的文件;
- shell:布尔型变量,明确要求使用shell运行程序,与参数 executable 一同指定子进程运行在什么 Shell 中——如果executable=None 而 shell=True,则使用 /bin/sh 来执行 args 指定的程序;也就是说,Python首先起一个shell,再用这个shell来解释指定运行的命令。
- cwd:代表路径的字符串,指定子进程运行的工作目录,要求这个目录必须存在;
- env:字典,键和值都是为子进程定义环境变量的字符串;
- universal_newline:布尔型变量,为 True 时,stdout 和 stderr 以通用换行(universal newline)模式打开,
- startupinfo:见下一个参数;
- creationfalgs:最后这两个参数是Windows中才有的参数,传递给Win32的CreateProcess API调用
- 进程间的通信: Python的 multiprocessing 模块包装了底层的机制,提供了Queue 、 Pipes 等多种方式来交换数据
1 #以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据 2 from multiprocessing import Process,Queue 3 import os,time,random 4 5 #写数据进程执行的代码 6 def write(q): 7 print(‘Process to write:%s‘ % os.getpid()) 8 for value in [‘A‘,‘B‘,‘C‘]: 9 print(‘Put %s to queue...‘% value) 10 q.put(value) 11 time.sleep(random.random()) 12 13 #读数据进程执行的代码 14 def read(q): 15 print(‘Process to read:%s‘ % os.getpid()) 16 while True: 17 value = q.get(True) 18 print(‘Get %s from queue.‘ % value) 19 20 if __name__ == ‘__main__‘: 21 #父进程创建Queue,并传给各个子进程 22 q = Queue() 23 pw = Process(target=write,args=(q,)) 24 pr = Process(target=read,args=(q,)) 25 #启动子进程pw,写入: 26 pw.start() 27 #启动子进程pr,读取: 28 pr.start() 29 #等待pw结束 30 pw.join() 31 #pr进程里是死循环,无法等待期结束,只能强行终止: 32 pr.terminate() 33 34 输出: 35 Process to write:7148 36 Put A to queue... 37 Process to read:9040 38 Get A from queue. 39 Put B to queue... 40 Get B from queue. 41 Put C to queue... 42 Get C from queue
二、多线程
线程包括开始、执行顺序和结束三部分。它有一个指令指针,用于记录当前运行的上下文。当其他线程运行时,它可以被抢占(中断)和临时挂起(也称为睡眠) ——这种做法叫做让步(yielding)。
使用两个模块:_thread:低级模块;Threading:高级模块
启动一个线程就是把一个函数传入并创建Thread
实例,然后调用start()
开始执行
- Threading:进程默认就会启动一个线程,称为主线程 MainThread ,主线程又可以启动新的线程, current_thread() 函数永远返回当前线程的实例。子线程的名字在创建时指定,下文用 LoopThread 命名子线程。不起名字Python会自动给线程命名为
Thread-1
,Thread-2...
1 import time, threading 2 3 # 新线程执行: 4 def loop(): 5 print(‘thread %s is running...‘ % threading.current_thread().name) #返回当前线程的实例 6 n = 0 7 while n < 5: 8 n += 1 9 print(‘thread %s >>> %s‘ % (threading.current_thread().name, n)) 10 time.sleep(1) 11 print(‘thread %s ended.‘ % threading.current_thread().name) 12 13 print(‘thread %s is running...‘ % threading.current_thread().name) #返回主线程的实例:MainThread 14 t = threading.Thread(target=loop, name=‘LoopThread‘) 15 t.start() 16 t.join() 17 print(‘thread %s ended.‘ % threading.current_thread().name) 18 输出: 19 20 thread MainThread is running... 21 thread LoopThread is running... 22 thread LoopThread >>> 1 23 thread LoopThread >>> 2 24 thread LoopThread >>> 3 25 thread LoopThread >>> 4 26 thread LoopThread >>> 5 27 thread LoopThread ended. 28 thread MainThread ended.
- Lock:多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改,通过 threading.Lock() 来实现执行单个线程,其它线程需等待被锁的线程释放后才能继续执行。防止多个线程同时运行时修改数据数据错误。
1 #不使用Look 的情况,当循环次数较多时,交替执行的线程其运行顺序会被改变,从而使结果发生改变 2 import time,threading 3 #假设银行存款 4 balance = 0 5 def change_it(n): 6 global balance #使用全局变量 7 balance = balance + n 8 balance = balance - n 9 10 def run_thread(n): 11 for i in range(100000): 12 change_it(n) 13 14 t1 = threading.Thread(target=run_thread,args=(4,)) 15 t2 = threading.Thread(target=run_thread,args=(8,)) 16 t1.start() 17 t2.start() 18 t1.join() 19 t2.join() 20 print(balance)
理想运行结果为:t1执行后,再进行t2的执行。结果都为0;而线程由操作系统调用,执行顺序可能被改变:
1 初始值 balance = 0 2 #t1,t2同时执行 3 t1: balance = balance + 4 # balance = 4 4 t2: balance = balance + 8 # balance = 8 5 6 t1: balance = balance - 4 # balance = 0 7 #t2执行第二条命令时,balance的值为t1刚执行完的值:0 8 t2: balance = balance - 8 # balance = 0 - 8 = -8 9 t2: balance = -8 10 11 结果 balance = -8
使用Look:结果不受其它线程的影响
1 import threading 2 balance = 0 3 lock = threading.Lock() 4 5 def change_it(n): 6 global balance #使用全局变量 7 balance = balance + n 8 balance = balance - n 9 10 def run_thread(n): 11 for i in range(10000): 12 #先获取锁 13 lock.acquire() 14 #使用try来保证获得锁的进程用完一定被释放,不让其它线程称为死线程无法执行 15 try: 16 change_it(n) 17 finally: 18 #释放锁 19 lock.release() 20 21 t1 = threading.Thread(target=run_thread,args=(4,)) 22 t2 = threading.Thread(target=run_thread,args=(8,)) 23 t1.start() 24 t2.start() 25 t1.join() 26 t2.join() 27 print(balance)
坏处:阻止了多线程并发执行;线程与线程互相获取对方的锁会导致线程挂起,无法执行
- GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。只能使用一个核或通过C扩展实现多核处理多线程。(多核任务可通过多进程来实现)
- ThreadLocal:将一个全局变量(如:local_school)作为 ThreadLocal() 的对象,相当于作为一个 dict ,每个线程(Thread)以自身作为key对它都可以读写 student 属性,且互不影响。即 local_school.student() 为局部变量每次都单独赋予每个线程。
1 import threading 2 #创建全局变量 3 local_school = threading.local() 4 5 def process_student(): 6 #当前线程关联的student 7 std = local_school.student 8 print(‘Hello,%s (in %s)‘%(std,threading.current_thread().name)) 9 10 def process_thread(name): 11 #绑定threadlocal的student,使赋予每个线程的局部变量student的值都不冲突,单独赋予; 12 local_school.student = name 13 process_student() 14 15 t1 = threading.Thread(target=process_thread,args=(‘Jack‘,),name=‘Thread-1‘) 16 t2 = threading.Thread(target=process_thread,args=(‘john‘,),name=‘Thread=2‘) 17 t1.start() 18 t2.start() 19 t1.join() 20 t2.join() 21 22 输出: 23 Hello,Jack (in Thread-1) 24 Hello,john (in Thread=2)
计算密集型:任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要高效利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数
IO密集型:涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。
异步IO:用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型;单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。
三、分布式进程
在Thread和Process中,应优选Process,因为Process更稳定,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
multiprocessing
模块中的managers
子模块支持把多进程分布到多台机器上,依靠网络通信。
1 # task_master.py 2 #!/user/bin/pytthon 3 # -*- coding:utf-8 -*- 4 #已有一个通过Queue通信的多进程程序在同一台机器上运行,希望把发送任务的进程和处理任务的进程分布到两台机器上; 5 #原有的Queue继续使用,通过managers模块把Queue通过网络暴露出去,就可让其他机器的进程访问Queue了;服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务 6 import random,time,queue 7 from multiprocessing.managers import BaseManager 8 from multiprocessing import freeze_support 9 10 task_queue = queue.Queue() # 发送任务的队列: 11 result_queue = queue.Queue() # 接收结果的队列: 12 class QueueManager(BaseManager): # 从BaseManager继承的QueueManager: 13 pass 14 # windows下运行,非Windows系统直接跳到注册部分; 15 def return_task_queue(): 16 global task_queue 17 return task_queue # 返回发送任务队列 18 def return_result_queue (): 19 global result_queue 20 return result_queue # 返回接收结果队列 21 22 def test(): 23 # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象; 24 # 非Windows下的代码,window下的序列化不能使用匿名函数 25 #QueueManager.register(‘get_task_queue‘, callable=lambda: task_queue) 26 #QueueManager.register(‘get_result_queue‘, callable=lambda: result_queue) 27 QueueManager.register(‘get_task_queue‘, callable=return_task_queue) 28 QueueManager.register(‘get_result_queue‘, callable=return_result_queue) 29 # 绑定端口5000, 设置验证码‘abc‘: 30 #manager = QueueManager(address=(‘‘, 5000), authkey=b‘abc‘) 31 # windows需要写ip地址 32 manager = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘abc‘) 33 manager.start() # 启动Queue: 34 # 获得通过网络访问的Queue对象: 35 task = manager.get_task_queue() 36 result = manager.get_result_queue() 37 for i in range(10): # 放几个任务进去: 38 n = random.randint(0, 10000) 39 print(‘Put task %d...‘ % n) 40 task.put(n) 41 # 从result队列读取结果: 42 print(‘Try get results...‘) 43 for i in range(10): 44 # 这里加了异常捕获 45 try: 46 r = result.get(timeout=5) 47 print(‘Result: %s‘ % r) 48 except queue.Empty: 49 print(‘result queue is empty.‘) 50 # 关闭: 51 manager.shutdown() 52 print(‘master exit.‘) 53 if __name__==‘__main__‘: 54 freeze_support() 55 print(‘start!‘) 56 test()
1 # task_worker.py 2 import time, sys, queue 3 from multiprocessing.managers import BaseManager 4 5 # 创建类似的QueueManager: 6 class QueueManager(BaseManager): 7 pass 8 9 # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: 10 QueueManager.register(‘get_task_queue‘) 11 QueueManager.register(‘get_result_queue‘) 12 13 # 连接到服务器,也就是运行task_master.py的机器: 14 server_addr = ‘127.0.0.1‘ 15 print(‘Connect to server %s...‘ % server_addr) 16 # 端口和验证码注意保持与task_master.py设置的完全一致: 17 m = QueueManager(address=(server_addr, 5000), authkey=b‘abc‘) 18 # 从网络连接: 19 m.connect() 20 # 获取Queue的对象: 21 task = m.get_task_queue() 22 result = m.get_result_queue() 23 # 从task队列取任务,并把结果写入result队列: 24 for i in range(10): 25 try: 26 n = task.get(timeout=1) 27 print(‘run task %d * %d...‘ % (n, n)) 28 r = ‘%d * %d = %d‘ % (n, n, n*n) 29 time.sleep(1) 30 result.put(r) 31 except queue.Empty: 32 print(‘task queue is empty.‘) 33 # 处理结束: 34 print(‘worker exit.‘)
task_master.py与task_worker.py得到的结果
1 # task_master.py 2 start! 3 Put task 6236... 4 Put task 4265... 5 Put task 9257... 6 Put task 2598... 7 Put task 181... 8 Put task 797... 9 Put task 7652... 10 Put task 6855... 11 Put task 1465... 12 Put task 8195... 13 Try get results... 14 Result: 6236 * 6236 = 38887696 15 Result: 4265 * 4265 = 18190225 16 Result: 9257 * 9257 = 85692049 17 Result: 2598 * 2598 = 6749604 18 Result: 181 * 181 = 32761 19 Result: 797 * 797 = 635209 20 Result: 7652 * 7652 = 58553104 21 Result: 6855 * 6855 = 46991025 22 Result: 1465 * 1465 = 2146225 23 Result: 8195 * 8195 = 67158025 24 master exit.
1 # task_worker.py 2 Connect to server 127.0.0.1... 3 run task 6236 * 6236... 4 run task 4265 * 4265... 5 run task 9257 * 9257... 6 run task 2598 * 2598... 7 run task 181 * 181... 8 run task 797 * 797... 9 run task 7652 * 7652... 10 run task 6855 * 6855... 11 run task 1465 * 1465... 12 run task 8195 * 8195... 13 worker exit.
添加任务到 Queue 不可以直接对原始的 task_queue 进行操作,那样就绕过了 QueueManager的 封装,必须通过 manager.get_task_queue() 获得的 Queue 接口添加;
task_worker.py 中没有创建 Queue 的代码, Queue 对象存储在 task_master.py 进程中:
Queue 是用来传递任务和接收结果,每个任务的描述数据量要尽量小。它通过 QueueManager 实现通过网络访问。由于 QueueManager 管理不止一个 Queue ,所以要给每个 Queue 的网络调用接口起个名字,比如 get_task_queue 。(笔记借鉴廖雪峰教程内容。)
以上是关于九进程与线程的主要内容,如果未能解决你的问题,请参考以下文章