进程 线程
Posted huangqiang97
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进程 线程相关的知识,希望对你有一定的参考价值。
参考:Process/Threading Process/Threading
# -------------------------------多进程
#
# Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
# 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
# windows不支持fork
# 有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务。
import multiprocessing
import os
# Only works on Unix/Linux/Mac:
# pid = os.fork()
# if pid == 0:
# print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
# else:
# print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
from multiprocessing import Process, Pool, Queue, Pipe
import time
#getpid()获得当前进程ID,getppid()获得父进程ID
def run_process(args):
time.sleep(1)
print('Ass we can!,parms:%s,super:%s,subs:%s'%(args,os.getppid(),os.getpid()))
if __name__=='__main__':
print('start')
print('super:%s'%(os.getpid()))
#创建进程对象,target:子进程要处理的函数,args:函数参数。
proc=Process(target=run_process,args=(12,))
#启动子进程
proc.start()
#让父进程等待子进程执行完毕,父进程才继续执行,通常用于进程间的同步。
proc.join()
print('finish')
import random
# -------------------------------进程池
# 如果要启动大量的子进程,可以用进程池的方式批量创建子进程.
# 由于进程启动的开销比较大,使用多进程的时候会导致大量内存空间被消耗。
# 为了防止这种情况发生可以使用进程池,(由于启动线程的开销比较小,所以不需要线程池这种概念,
# 多线程只会频繁得切换cpu导致系统变慢,并不会占用过多的内存空间)
# 进程池中常用方法:
# `apply()` 同步执行(串行),多个进程按启动顺序挨个执行,一个执行完再到下一个进程。
# `apply_async()` 异步执行(并行),启动多个进程同步执行,并行数目按Pool大小和CPU 线程数决定。
# `terminate()` 立刻关闭进程池
# `close()` 等待所有进程结束后,才关闭进程池。
# `join()` 主进程等待所有子进程执行完毕。必须在close或terminate()之后。
#示例1
def subs_proc(args):
print('i'm %s ,id"%s'%(args,os.getpid()))
start=time.time()
time.sleep(random.random()*3)
print('%s:run time %s s'%(args,time.time()-start))
if __name__=='__main__':
#线程池。默认大小为CPU线程数,本机为8
pool=Pool(8)
print('id:%s'%os.getpid())
for i in range(10):
#添加任务,如线程池满,要等待其中有线程结束,资源空闲才会获得资源开始执行。
pool.apply_async(subs_proc,args=(i,))
pool.close()
#会等待所有子进程执行完毕,调用join()之前必须先调用close()/terminate(),调用close()/terminate()之后就不能继续添加新的Process了。
pool.join()
print('finish')
#示例1
def Foo(i):
time.sleep(2)
print(i)
return i + 100
def Bar(arg):
print('-->exec done:', arg)
# 允许进程池同时放入5个进程
#进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,
# 如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
# 在上面的程序中产生了10个进程,但是只能有5同时被放入进程池,剩下的都被暂时挂起,并不占用内存空间,等前面的五个进程执行完后,再执行剩下5个进程。
if __name__=='__main__':
pool = Pool(5)
for i in range(10):
#异步执行。
# func子进程执行完后,才会执行callback,否则callback不执行(而且callback是由父进程来执行了)
# Bar(Foo(i))
pool.apply_async(func=Foo, args=(i,), callback=Bar)
#pool.apply(func=Foo, args=(i,))
print('end')
pool.close()
# 主进程等待所有子进程执行完毕。必须在close()或terminate()之后。
pool.join()
#---------------------------------子进程
# 示例一
#很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。
#import subprocess
#等价于 cmd输入:ping 127.0.0.1
# r = subprocess.call(['ping', '127.0.0.1'])
# print('Exit code:', r)
#实例2
# 如果子进程还需要输入,则可以通过communicate()方法输入
#等价于 cmd输入:my_cmd
#p = subprocess.Popen(['my_cmd'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#相当于在命令行执行命令my_cmd,然后手动输入:
# a=1
# b=2
# c=3
# output, err = p.communicate(b'a=1
b=2
c=3
')
# print(output.decode('utf-8'))
# print('Exit code:', p.returncode)
#实例三
# p = subprocess.Popen('cmd.exe',shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# output, err = p.communicate("echo HELLW_WORLD".encode("GBK"))
# print(output.decode('GBK'))
# print(p.returncode)
#---------------------------------子进程间通信
def proc_write(queue):
for i in ['A','B','C','D','E','F']:
#写数据put(True,waittime)
#True:表示在queue满下可以等待,waittime 若为None则一直等待,非None则在等待规定事件后还没有数据抛出Queue.Full
#False:表示不等待,queue满就抛出Queue.Full
try:
queue.put(i,True,3)
print('put %s to queue'%i)
time.sleep(random.random()*2)
except Exception as e:
print(e)
break
def proc_read(queue):
while True:
#读数据,get(True,waittime):
# True表示queue空可以等待,waittime 若为None则一直等待,非None则在等待规定事件后还没有数据抛出Queue.Empty
# False表示queue空不等待,没有数据就抛出Queue.Empty
try:
msg=queue.get(True,3)
print('get %s from queue'%msg)
except Exception as e:
print(e)
break
if __name__=='__main__':
queue=Queue()
pip=Pipe()
#创建进程实例
p_w=Process(target=proc_write,args=(queue,))
p_r=Process(target=proc_read,args=(queue,))
p_w.start()
p_r.start()
#等待write结束
p_w.join()
#强制结束read
p_r.terminate()
#pipe
#创建子进程时候,就是把一个端口复制一份再给子线程,各个子线程的端口相互独立。
#同时父线程仍然保有一对端口,可实现父线程与各个子线程间通信。
#设置False,实现单向传输。
#pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()方法。
#如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据。
#父线程与多个子线程间双向通信。
def pipe_write(pipe):
while True:
try:
pipe.send('msg from write')
time.sleep(random.random() * 2)
msg = pipe.recv()
print('write get %s' % msg)
except Exception as e:
print(e)
break
def pipe_read(pipe):
while True:
try:
pipe.send('msg from read')
time.sleep(random.random() * 2)
msg = pipe.recv()
print('read get %s' % msg)
except Exception as e:
print(e)
break
if __name__ == '__main__':
#True:双向通信
#False,单向通行,A只能用来接收消息,B只能用来发送消息
pipe_A, pipe_B = Pipe(True)
#将B传递给多个子进程,会把B端复制后再创递,建立链接: A<---->B,A<----->C
proc_read = Process(target=pipe_read, args=(pipe_B,))
proc_write = Process(target=pipe_write, args=(pipe_B,))
proc_read.start()
proc_write.start()
while True:
try:
pipe_A.send('msg from super')
time.sleep(random.random() * 2)
msg = pipe_A.recv()
print('super get %s' % msg)
except Exception as e:
print(e)
break
#通过Manager可实现进程间数据的共享。Manager()返回的manager对象会通过一个服务进程,来使其他进程通过代理的方式操作python对象。
# manager对象支持 `list`, `dict`, `Namespace`, `Lock`, `RLock`, `Semaphore`, `BoundedSemaphore`, `Condition`, `Event`, `Barrier`, `Queue`, `Value` ,`Array`.
from multiprocessing import Process, Lock
def f(d, l):
d['1'] = 1
l.append(1)
if __name__ == '__main__':
with Manager() as manager:
#构建同步数据对象
d = manager.dict()
l = manager.list()
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print('-----------------')
print(l)
#---------------------------------子进程同步锁
#进程同步锁,进程同步。
#数据输出的时候保证不同进程的输出内容在同一块屏幕正常显示,防止数据乱序的情况。
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock =Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
#---------------------------------多线程
#多任务可以由多进程完成,也可以由一个进程内的多线程完成。
#我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。
#由于线程是操作系统直接支持的执行单元,
import threading
#任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,
# Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。
# 主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。
# 名字仅仅在打印时用来显示,完全没有其他意义,
# 如果不起名字Python就自动给线程命名为Thread-1,Thread-2……
def subs_thread():
#current_thread()函数,它永远返回当前线程的实例
print('%s is running!'%threading.current_thread().name)
for i in range(3):
print(i)
time.sleep(random.random())
print('%s is finish!' % threading.current_thread().name)
print('%s is running!'%threading.current_thread().name)
#创建子线程,子线程的名字在创建时指定(可选参数)
sub=threading.Thread(target=subs_thread,name='subs_thread')
#启动子线程
sub.start()
sub.join()
print('%s is finish!' % threading.current_thread().name)
#---------------------------------多线程同步锁
# 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,
# 而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,
# 因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
#修改z资源值需要多条语句,而执行这几条语句时,线程可能中断,即执行顺序与逻辑顺序不同,从而导致多个线程把同一个对象的内容改乱了。
#同步锁:线程获得了锁,因此其他线程不能同时执行枷锁操作,只能等待,直到锁被释放后,获得该锁以后才执行。
# 由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁
#阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。
# 其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,
# 导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
count=0
#互斥锁
lock=threading.Lock()
#RLcok类的用法和Lock类一模一样,但它支持嵌套,,在多个锁没有释放的时候一般会使用使用RLcok类。
#lock = threading.RLock()
#互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据
#semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
def change(step):
#加锁
lock.acquire()
# 告知Python这个变量名不是局部的,而是 全局 的,要在外部寻找它的定义。
global count
try:
for i in range(100000):
count=count+step
count=count-step
finally:
# 用finally 保证改完了一定会释放锁:
lock.release()
for i in range(50):
t_0=threading.Thread(target=change,args=(5,))
t_1=threading.Thread(target=change,args=(8,))
t_0.start()
t_1.start()
#join 很重要,如果没有join ,子线程还在运行时,主线程就开始执行下面的语句,取到的count就可能是个中间值,而非终值。
t_0.join()
t_1.join()
if count != 0:
print('wrong')
print(count)
break
print('----------------')
#ython的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,
# 任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,
# 让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,
# 所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
#Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。
#Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。
# 多个Python进程有各自独立的GIL锁,互不影响。
def loop():
x = 0
while True:
x = x ^ 1
#对线程方式,只能使用一个核。
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
#对进程方式,能使用所有核。
if __name__=='__main__':
for i in range(multiprocessing.cpu_count()):
t = Process(target=loop)
t.start()
#---------------------------------多线程局部变量
#在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,
# 因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
#但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦。
#一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,各个线程数据相互隔离,互不干扰。
# ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题
#ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,
# 这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
thread_data=threading.local()
def stu_read():
#取数据
name=thread_data.stu.name
print('thread %s:%s'%(threading.current_thread().name,name))
def stu_write(name):
#写数据
thread_data.stu=Stu(name)
stu_read()
class Stu(object):
def __init__(self,name):
self.name=name
#各个线程间数据独立
t_0=threading.Thread(target=stu_write,name='thread_A',args=('A',))
t_1=threading.Thread(target=stu_write,name='thread_B',args=('B',))
t_0.start()
t_1.start()
# 多进程和多线程,这是实现多任务最常用的两种方式。
# 首先,要实现多任务,通常我们会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,
# 因此,多任务环境下,通常是一个Master,多个Worker。
# 如果用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker。
# 如果用多线程实现Master-Worker,主线程就是Master,其他线程就是Worker。
# 多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。
# 主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低。
# 多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大。
# 另外,操作系统能同时运行的进程数也是有限的。
# 多线程模式通常比多进程快一点,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。
# 在Windows下,多线程的效率比多进程要高。但多线程存在稳定性的问题。
# 线程切换
# 无论是多进程还是多线程,切换时候都要进行保存当前环境,准备下一个任务环境。
# 单任务模型:挨个处理完毕任务。
# 多任务模型:在多任务间频繁切换。
# 多任务一旦多到一个限度,切换操作就会消耗掉系统所有的资源,效率急剧下降。
# 计算密集型 vs. IO密集型
# 计算密集型任务的特点是要进行大量的计算,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,
# 但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
# 计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。最好用C语言编写。
#
# 第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。
# 对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
# IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言。
# 异步IO
# 考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,
# 因此,需要多进程模型或者多线程模型来支持多任务并发执行。
# 现代操作系统支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,
# 这种全新的模型称为事件驱动模型,在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。
# 由于系统总的进程数量十分有限,因此操作系统调度非常高效。
# 对应到Python语言,单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。
分布式多进程
#master
# 分布式系统
# 在Thread和Process中,应当优选Process,因为Process更稳定,
# 而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
# Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。
# 一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。
# 通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了,
# 其它程序必须通过manager.get_task_queue()获得的Queue接口添加
import random, queue,time
from multiprocessing.managers import BaseManager
from queue import Queue
from multiprocessing import freeze_support
task_size=10
# 发送任务的队列:
task_queue = queue.Queue(task_size)
# 接收结果的队列:
result_queue = queue.Queue(task_size)
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass
# 服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务
# 把两个Queue都注册到网络上, callable参数关联了Queue对象,
# 由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字
def get_task_q():
return task_queue
def get_result_q():
return result_queue
def func():
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=get_task_q)
QueueManager.register('get_result_queue', callable=get_result_q)
# 绑定端口5000, 设置验证码'abc':
# 保证两台机器正常通信,不被其他机器恶意干扰。
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
try:
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(task_size):
n = random.randint(0, 10000)
print('Put task %d...' % n)
try:
task.put(n)
except Queue.Full:
print('task queue is full.')
# while not result_queue.full():
# time.sleep(1)
# 从result队列读取结果:
print('Try get results...')
for i in range(task_size):
#while not result.empty():
try:
r = result.get(timeout=10)
print('Result: %s' % r)
except Queue.Empty:
print('task queue is empty.')
except Exception:
print('master fucked!')
finally:
# 关闭:
manager.shutdown()
print('master exit.')
if __name__ == '__main__':
#windows下多进程可能会炸,添加这句可以缓解
freeze_support()
func()
#worker
# # 分布式系统
import time, sys
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
try:
# 从网络连接:
m.connect()
except:
print('connect fucked!')
sys.exit(-1)
# task_worker.py中根本没有创建Queue的代码,
# Queue对象存储在task_master.py进程中,其它程序通过master暴露接口访问。
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
while not task.empty():
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Exception:
print('task queue is empty.')
# 处理结束:
print('worker exit.')
#### Evevt
事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,,全局定义了一个“Flag”,当flag值为“False”,那么event.wait()就会阻塞,当flag值为“True”,那么event.wait()便不再阻塞。
#利用Event类模拟红绿灯
import threading
import time
import random
start=time.time()
event = threading.Event()
car_list=[]
car_count=0
lock=threading.Lock()
def lighter():
time_count = 0
# 设置标志位,初始值为绿灯
event.set()
while True:
#5~5.5s:黄灯,5.5~20s:红灯
if 5 < time_count <=20 :
# 红灯,清除标志位
event.clear()
print("33[41;1m<---------------------------->