九进程与线程

Posted liqiongming

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了九进程与线程相关的知识,希望对你有一定的参考价值。

进程:一个任务就是一个进程(Process)

线程:进程内的“子任务”称为线程(Thread)

线程是最小的执行单元,而进程由至少一个线程组成。多进程和多线程的程序涉及到同步、数据共享的问题

一、多进程

  • fork():调用一次,返回两次,把当前进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回,子进程永远返回0,父进程返回子进程的ID。 子进程需要调用 getppid() 拿到父进程的ID。(window下无法调用该函数)
     1 import os
     2 
     3 print(Process (%s) start... % os.getpid())
     4 # Only works on Unix/Linux/Mac:
     5 pid = os.fork()
     6 if pid == 0:
     7     print(I am child process (%s) and my parent is %s. % (os.getpid(), os.getppid()))
     8 else:
     9     print(I (%s) just created a child process (%s). % (os.getpid(), pid))
    10 
    11 输出:
    12 Process (876) start...
    13 I (876) just created a child process (877).
    14 I am child process (877) and my parent is 876.
  • multiprocessing:跨平台版本的多进程模块,提供一个 Process 类来代表一个进程对象
     1 from multiprocessing import Process
     2 import os
     3 #子进程要执行的代码
     4 def run_proc(name):
     5     print(运行子进程中,名字:%s,进程ID:%s... %(name,os.getpid()))
     6 
     7 if __name__ == __main__:
     8     print(父进程ID:%s.%os.getpid())
     9     p = Process(target=run_proc, args=(test,))    #创建一个Process实例;args为元组;
    10     print(开始执行子进程.)
    11     p.start()
    12     p.join()    #等待子进程结束后再继续往下运行,通常用于进程间的同步;
    13     print(结束子进程.)
    14     print(os.getpid())
    15     print(os.getppid())
    16 
    17 输出:
    18 父进程ID:1368.
    19 开始执行子进程.
    20 运行子进程中,名字:test,进程ID:8692...
    21 结束子进程.
    22 1368
    23 3324
  • Pool:进程池,批量创建子进程, Pool对象 调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close() ,调用 close() 之后就不能继续添加新的 Process 了,Pool的默认大小是CPU的核数。
    1 def apply(self, func, args=(), kwds={})函数,相当于‘func(* args,** kwds)’
     1 from multiprocessing import Pool
     2 import os,time,random
     3 
     4 def long_time_task(name):
     5     print(运行任务%s,ID:%s % (name,os.getpid()))
     6 
     7     start = time.time()              #开始运行:0秒开始
     8     time.sleep(random.random()* 3)   #取一个0.0 - 1.0间的随机浮点数乘以3倍,来作为进程在此停留或者运行的时间
     9     end = time.time()                #经过time.sleep后的时间,相当于end-start = time.sleep(random.random()* 3) 的时间
    10 
    11     print(任务%s运行%0.2f秒。%(name,(end-start)))
    12 
    13 if __name__ == __main__:
    14     print(父进程ID:%s % os.getpid())
    15     p = Pool(4)         # Pool的值在本机默认值为4,表示最多同时执行4个进程,后面的需等前面某个进程完成时才能继续执行;
    16     for i in range(5):
    17         # apply_async会阻塞进程直到返回数据才把进程释放,然后有空余的进程来执行下一个进程(不等于挂起进程)
    18         p.apply_async(long_time_task,args=(i,))
    19     print(等待所有子进程完成...)
    20     p.close()
    21     p.join()
    22     print(所有子进程完成。)
    23 
    24 输出:
    25 父进程ID:5048
    26 等待所有子进程完成...
    27 运行任务0,ID:6840
    28 运行任务1,ID:5232
    29 运行任务2,ID:4884
    30 运行任务3,ID:1900
    31 任务2运行0.24秒。
    32 运行任务4,ID:4884
    33 任务1运行1.10秒。
    34 任务4运行0.97秒。
    35 任务0运行2.08秒。
    36 任务3运行2.13秒。
    37 所有子进程完成。
  • 子进程
     1 import subprocess
     2 
     3 print($ nslookup www.python.org)
     4 r = subprocess.call([nslookup, www.python.org])
     5 print(Exit code:, r)
     6 
     7 输出:
     8 $ nslookup www.python.org
     9 Server:        192.168.19.4
    10 Address:    192.168.19.4#53
    11 
    12 Non-authoritative answer:
    13 www.python.org    canonical name = python.map.fastly.net.
    14 Name:    python.map.fastly.net
    15 Address: 199.27.79.223
    16 
    17 Exit code: 0

    通过 communicate() 方法输入对子进程进行输入操作,父进程如果想要和子进程通过  communicate() 方法通信, subprocess.Popen() 里对应的参数必须是  subprocess.PIPE ,如果为默认值 None ,那么子进程使用和父进程相同的标准流文件

    下文相当于在命令行执行命令nslookup,然后手动输入:

    1 set q=mx
    2 python.org
    3 exit
     1 import subprocess
     2 
     3 print($ nslookup)
     4 p = subprocess.Popen([nslookup], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)    #默认为:=None
     5 output, err = p.communicate(bset q=mx
    python.org
    exit
    )
     6 print(output.decode(utf-8))
     7 print(Exit code:, p.returncode)
     8 
     9 输出:
    10 $ nslookup
    11 Server:        192.168.19.4
    12 Address:    192.168.19.4#53
    13 
    14 Non-authoritative answer:
    15 python.org    mail exchanger = 50 mail.python.org.
    16 
    17 Authoritative answers can be found from:
    18 mail.python.org    internet address = 82.94.164.166
    19 mail.python.org    has AAAA address 2001:888:2000:d::a6
    20 
    21 
    22 Exit code: 0

     subprocess.Popen():创建并返回一个子进程,并在这个进程中执行指定的程序

    subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
    • args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串;
    • bufsize:控制 stdin, stdout, stderr 等参数指定的文件的缓冲,和打开文件的 open()函数中的参数 bufsize 含义相同;
    • executable:如果这个参数不是 None,将替代参数 args 作为可执行程序;
    • stdin:指定子进程的标准输入;
    • stdout:指定子进程的标准输出;
    • stderr:指定子进程的标准错误输出;
    • preexec_fn:默认是None,否则必须是一个函数或者可调用对象,在子进程中首先执行这个函数,然后再去执行为子进程指定的程序或Shell。
    • close_fds:布尔型变量,为 True 时,在子进程执行前强制关闭所有除 stdin,stdout和stderr外的文件;
    • shell:布尔型变量,明确要求使用shell运行程序,与参数 executable 一同指定子进程运行在什么 Shell 中——如果executable=None 而 shell=True,则使用 /bin/sh 来执行 args 指定的程序;也就是说,Python首先起一个shell,再用这个shell来解释指定运行的命令。
    • cwd:代表路径的字符串,指定子进程运行的工作目录,要求这个目录必须存在;
    • env:字典,键和值都是为子进程定义环境变量的字符串;
    • universal_newline:布尔型变量,为 True 时,stdout 和 stderr 以通用换行(universal newline)模式打开,
    • startupinfo:见下一个参数;
    • creationfalgs:最后这两个参数是Windows中才有的参数,传递给Win32的CreateProcess API调用

 

  • 进程间的通信: Python的 multiprocessing 模块包装了底层的机制,提供了Queue 、 Pipes 等多种方式来交换数据
     1 #以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据
     2 from multiprocessing import Process,Queue
     3 import os,time,random
     4 
     5 #写数据进程执行的代码
     6 def write(q):
     7     print(Process to write:%s % os.getpid())
     8     for value in [A,B,C]:
     9         print(Put %s to queue...% value)
    10         q.put(value)
    11         time.sleep(random.random())
    12 
    13 #读数据进程执行的代码
    14 def read(q):
    15     print(Process to read:%s % os.getpid())
    16     while True:
    17         value = q.get(True)
    18         print(Get %s from queue. % value)
    19 
    20 if __name__ == __main__:
    21     #父进程创建Queue,并传给各个子进程
    22     q = Queue()
    23     pw = Process(target=write,args=(q,))
    24     pr = Process(target=read,args=(q,))
    25     #启动子进程pw,写入:
    26     pw.start()
    27     #启动子进程pr,读取:
    28     pr.start()
    29     #等待pw结束
    30     pw.join()
    31     #pr进程里是死循环,无法等待期结束,只能强行终止:
    32     pr.terminate()
    33 
    34 输出:
    35 Process to write:7148
    36 Put A to queue...
    37 Process to read:9040
    38 Get A from queue.
    39 Put B to queue...
    40 Get B from queue.
    41 Put C to queue...
    42 Get C from queue

 

二、多线程

  线程包括开始、执行顺序和结束三部分。它有一个指令指针,用于记录当前运行的上下文。当其他线程运行时,它可以被抢占(中断)和临时挂起(也称为睡眠) ——这种做法叫让步yielding。 

使用两个模块:_thread:低级模块;Threading:高级模块

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行

  • Threading:进程默认就会启动一个线程,称为主线程 MainThread ,主线程又可以启动新的线程, current_thread() 函数永远返回当前线程的实例。子线程的名字在创建时指定,下文用 LoopThread 命名子线程。不起名字Python会自动给线程命名为Thread-1Thread-2...
     1 import time, threading
     2 
     3 # 新线程执行:
     4 def loop():
     5     print(thread %s is running... % threading.current_thread().name)  #返回当前线程的实例
     6     n = 0
     7     while n < 5:
     8         n += 1
     9         print(thread %s >>> %s % (threading.current_thread().name, n))
    10         time.sleep(1)
    11     print(thread %s ended. % threading.current_thread().name)
    12 
    13 print(thread %s is running... % threading.current_thread().name)  #返回主线程的实例:MainThread
    14 t = threading.Thread(target=loop, name=LoopThread)
    15 t.start()
    16 t.join()
    17 print(thread %s ended. % threading.current_thread().name)
    18 输出:
    19 
    20 thread MainThread is running...
    21 thread LoopThread is running...
    22 thread LoopThread >>> 1
    23 thread LoopThread >>> 2
    24 thread LoopThread >>> 3
    25 thread LoopThread >>> 4
    26 thread LoopThread >>> 5
    27 thread LoopThread ended.
    28 thread MainThread ended.

     

  • Lock:多线程中,所有变量都由所有线程共享,任何一个变量都可以被任何一个线程修改,通过 threading.Lock() 来实现执行单个线程,其它线程需等待被锁的线程释放后才能继续执行。防止多个线程同时运行时修改数据数据错误。  
     1 #不使用Look 的情况,当循环次数较多时,交替执行的线程其运行顺序会被改变,从而使结果发生改变
     2 import time,threading
     3 #假设银行存款
     4 balance = 0
     5 def change_it(n):
     6     global balance  #使用全局变量
     7     balance = balance + n
     8     balance = balance - n
     9 
    10 def run_thread(n):
    11     for i in range(100000):
    12         change_it(n)
    13 
    14 t1 = threading.Thread(target=run_thread,args=(4,))
    15 t2 = threading.Thread(target=run_thread,args=(8,))
    16 t1.start()
    17 t2.start()
    18 t1.join()
    19 t2.join()
    20 print(balance)

    理想运行结果为:t1执行后,再进行t2的执行。结果都为0;而线程由操作系统调用,执行顺序可能被改变:

     1 初始值 balance = 0
     2 #t1,t2同时执行
     3 t1: balance = balance + 4  # balance = 4
     4 t2: balance = balance + 8  # balance = 8
     5 
     6 t1: balance = balance - 4  # balance = 0
     7 #t2执行第二条命令时,balance的值为t1刚执行完的值:0
     8 t2: balance = balance - 8  # balance = 0 - 8 = -8
     9 t2: balance = -8
    10 
    11 结果 balance = -8

    使用Look:结果不受其它线程的影响

     1 import threading
     2 balance = 0
     3 lock = threading.Lock()
     4 
     5 def change_it(n):
     6     global balance  #使用全局变量
     7     balance = balance + n
     8     balance = balance - n
     9 
    10 def run_thread(n):
    11     for i in range(10000):
    12         #先获取锁
    13         lock.acquire()
    14         #使用try来保证获得锁的进程用完一定被释放,不让其它线程称为死线程无法执行
    15         try:
    16             change_it(n)
    17         finally:
    18             #释放锁
    19             lock.release()
    20 
    21 t1 = threading.Thread(target=run_thread,args=(4,))
    22 t2 = threading.Thread(target=run_thread,args=(8,))
    23 t1.start()
    24 t2.start()
    25 t1.join()
    26 t2.join()
    27 print(balance)

     坏处:阻止了多线程并发执行;线程与线程互相获取对方的锁会导致线程挂起,无法执行

  • GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。只能使用一个核或通过C扩展实现多核处理多线程。(多核任务可通过多进程来实现)

     

  • ThreadLocal:将一个全局变量(如:local_school)作为 ThreadLocal() 的对象,相当于作为一个 dict ,每个线程(Thread)以自身作为key对它都可以读写 student 属性,且互不影响。即 local_school.student() 为局部变量每次都单独赋予每个线程。
     1 import threading
     2 #创建全局变量
     3 local_school = threading.local()
     4 
     5 def process_student():
     6     #当前线程关联的student
     7     std = local_school.student
     8     print(Hello,%s (in %s)%(std,threading.current_thread().name))
     9 
    10 def process_thread(name):
    11     #绑定threadlocal的student,使赋予每个线程的局部变量student的值都不冲突,单独赋予;
    12     local_school.student = name
    13     process_student()
    14 
    15 t1 = threading.Thread(target=process_thread,args=(Jack,),name=Thread-1)
    16 t2 = threading.Thread(target=process_thread,args=(john,),name=Thread=2)
    17 t1.start()
    18 t2.start()
    19 t1.join()
    20 t2.join()
    21 
    22 输出:
    23 Hello,Jack (in Thread-1)
    24 Hello,john (in Thread=2)

     

   计算密集型:任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要高效利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数

  IO密集型:涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。

  异步IO:用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型;单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。

 

三、分布式进程

  在Thread和Process中,应优选Process,因为Process更稳定,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

multiprocessing模块中的managers子模块支持把多进程分布到多台机器上,依靠网络通信。

 1 # task_master.py
 2 #!/user/bin/pytthon
 3 # -*- coding:utf-8 -*-
 4 #已有一个通过Queue通信的多进程程序在同一台机器上运行,希望把发送任务的进程和处理任务的进程分布到两台机器上;
 5 #原有的Queue继续使用,通过managers模块把Queue通过网络暴露出去,就可让其他机器的进程访问Queue了;服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务
 6 import random,time,queue
 7 from multiprocessing.managers import BaseManager
 8 from multiprocessing import freeze_support
 9 
10 task_queue =  queue.Queue()  # 发送任务的队列:
11 result_queue = queue.Queue() # 接收结果的队列:
12 class QueueManager(BaseManager):  # 从BaseManager继承的QueueManager:
13     pass
14 # windows下运行,非Windows系统直接跳到注册部分;
15 def return_task_queue():
16     global task_queue
17     return task_queue  # 返回发送任务队列
18 def return_result_queue ():
19     global result_queue
20     return result_queue # 返回接收结果队列
21 
22 def test():
23     # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象;
24     # 非Windows下的代码,window下的序列化不能使用匿名函数
25     #QueueManager.register(‘get_task_queue‘, callable=lambda: task_queue)   
26     #QueueManager.register(‘get_result_queue‘, callable=lambda: result_queue)
27     QueueManager.register(get_task_queue, callable=return_task_queue)
28     QueueManager.register(get_result_queue, callable=return_result_queue)
29     # 绑定端口5000, 设置验证码‘abc‘:
30     #manager = QueueManager(address=(‘‘, 5000), authkey=b‘abc‘)
31     # windows需要写ip地址
32     manager = QueueManager(address=(127.0.0.1, 5000), authkey=babc)
33     manager.start()  # 启动Queue:
34     # 获得通过网络访问的Queue对象:
35     task = manager.get_task_queue()
36     result = manager.get_result_queue()
37     for i in range(10):   # 放几个任务进去:
38         n = random.randint(0, 10000)
39         print(Put task %d... % n)
40         task.put(n)
41     # 从result队列读取结果:
42     print(Try get results...)
43     for i in range(10):
44         # 这里加了异常捕获
45         try:
46             r = result.get(timeout=5)
47             print(Result: %s % r)
48         except queue.Empty:
49              print(result queue is empty.)
50     # 关闭:
51     manager.shutdown()
52     print(master exit.)
53 if __name__==__main__:
54     freeze_support()
55     print(start!)
56     test()
 1 # task_worker.py
 2 import time, sys, queue
 3 from multiprocessing.managers import BaseManager
 4 
 5 # 创建类似的QueueManager:
 6 class QueueManager(BaseManager):
 7     pass
 8 
 9 # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
10 QueueManager.register(get_task_queue)
11 QueueManager.register(get_result_queue)
12 
13 # 连接到服务器,也就是运行task_master.py的机器:
14 server_addr = 127.0.0.1
15 print(Connect to server %s... % server_addr)
16 # 端口和验证码注意保持与task_master.py设置的完全一致:
17 m = QueueManager(address=(server_addr, 5000), authkey=babc)
18 # 从网络连接:
19 m.connect()
20 # 获取Queue的对象:
21 task = m.get_task_queue()
22 result = m.get_result_queue()
23 # 从task队列取任务,并把结果写入result队列:
24 for i in range(10):
25     try:
26         n = task.get(timeout=1)
27         print(run task %d * %d... % (n, n))
28         r = %d * %d = %d % (n, n, n*n)
29         time.sleep(1)
30         result.put(r)
31     except queue.Empty:
32         print(task queue is empty.)
33 # 处理结束:
34 print(worker exit.)
task_master.py与task_worker.py得到的结果
 1 # task_master.py
 2 start!
 3 Put task 6236...
 4 Put task 4265...
 5 Put task 9257...
 6 Put task 2598...
 7 Put task 181...
 8 Put task 797...
 9 Put task 7652...
10 Put task 6855...
11 Put task 1465...
12 Put task 8195...
13 Try get results...
14 Result: 6236 * 6236 = 38887696
15 Result: 4265 * 4265 = 18190225
16 Result: 9257 * 9257 = 85692049
17 Result: 2598 * 2598 = 6749604
18 Result: 181 * 181 = 32761
19 Result: 797 * 797 = 635209
20 Result: 7652 * 7652 = 58553104
21 Result: 6855 * 6855 = 46991025
22 Result: 1465 * 1465 = 2146225
23 Result: 8195 * 8195 = 67158025
24 master exit.
 1 # task_worker.py
 2 Connect to server 127.0.0.1...
 3 run task 6236 * 6236...
 4 run task 4265 * 4265...
 5 run task 9257 * 9257...
 6 run task 2598 * 2598...
 7 run task 181 * 181...
 8 run task 797 * 797...
 9 run task 7652 * 7652...
10 run task 6855 * 6855...
11 run task 1465 * 1465...
12 run task 8195 * 8195...
13 worker exit.

添加任务到 Queue 不可以直接对原始的 task_queue 进行操作,那样就绕过了 QueueManager的 封装,必须通过 manager.get_task_queue() 获得的 Queue 接口添加;

 task_worker.py 中没有创建 Queue 的代码, Queue 对象存储在 task_master.py 进程中:技术分享图片

 Queue 是用来传递任务和接收结果,每个任务的描述数据量要尽量小。它通过 QueueManager 实现通过网络访问。由于 QueueManager 管理不止一个 Queue ,所以要给每个 Queue 的网络调用接口起个名字,比如 get_task_queue 。(笔记借鉴廖雪峰教程内容。)

 

以上是关于九进程与线程的主要内容,如果未能解决你的问题,请参考以下文章

golang学习九:Go并发编程

多线程编程

第九篇:进程与线程

线程学习知识点总结

八.多进程与多线程

Java线程