Python多进程
Posted leixiansheng6f6
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多进程相关的知识,希望对你有一定的参考价值。
第一:multiprocessing 多进程模块
属性(继承后,子类可以直接使用) daemon: 守护进程,和线程的setDaemon()一样 name: 进程名字,会根据子类名+id,生成一个新名字 pid: 进程编号 实例方法: is_alive():返回进程是否在运行。 join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或 到达指定的timeout(可选参数)。 start():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程 构造方法: Process([group [, target [, name [, args [, kwargs]]]]]) group: 线程组,目前还没有实现,库引用中提示必须是None; target: 要执行的方法; name: 进程名; args/kwargs: 要传入方法的参数。
import multiprocessing def pro(p_name): print(‘嗨,%s,我是一个进程‘ % p_name) if __name__ == ‘__main__‘: for i in range(5): # 函数名最好别用 p,否则会和Process的一些变量或者方法冲突 # p = multiprocessing.Process(target=p, args=(‘tom‘,)) # 会报异常 p = multiprocessing.Process(target=pro, args=(‘tom‘,)) p.start() p.join()
import multiprocessing class CusProcess(multiprocessing.Process): def run(self): print(‘多进程创建了,我是编号[-%s-]进程‘ % self.name) ‘‘‘ # 注意:必须要加这行代码if __name__ == ‘__main__‘:,不加会报异常 这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件, 而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归子进程报错。 所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候 name 不是 main , 就不会递归运行了。 ‘‘‘ if __name__ == ‘__main__‘: for i in range(3): c = CusProcess() c.start() ‘‘‘ # 小结: 对比多线程,多进程可以说是真正的并行了,我的CPU是四核的,它代表能同一时刻运行4个进程,以此类推。 另外,C++,JAVA这些语言是可以实现线程并行的.但唯独Python不行,因为GIL的存在。支持超线程技术的 CPU是能够并行两个线程的 ‘‘‘
from multiprocessing import Process import time,random num = 258 class MyPro(Process): def run(self): global num print(‘子进程‘, id(num)) if __name__ == ‘__main__‘: for i in range(3): m = MyPro() m.start() time.sleep(3) print(‘主进程‘, id(num)) 结果: 子进程 2152790437040 子进程 1688574045360 子进程 2599376727216 主进程 2029646163664 由此可见,进程的内存地址都是独立的
第二:join([timeout])
from multiprocessing import Process
import time,random
num = 258
class MyPro(Process):
def run(self):
time.sleep(10)
print(‘%s子进程退出‘ % self.name)
if __name__ == ‘__main__‘:
m1 = MyPro()
m2 = MyPro()
m1.start()
m2.start()
m1.join()
m2.join()
print(‘主进程的主线程退出‘)
# 注意:
1.多进程的join()和多线程的join()卡住的不是进程,而是主线程,谁调用了join,就等于告诉主线程:我执行完了,你主线程才能往下执行。
另外,子进程也是有自己的主线程的
2.所有子线程或者子进程集中调用了join后,主线程的等待的总时间以 耗费时间最长的那个进程或者线程的运行时间 为准
3.如果你创建了一个子进程或者线程运行后立马跟着一个join(),会导致串行执行,比如以上代码改成:
m1.start()
m1.join()
m2.start()
m2.join()
from multiprocessing import Process import time,random num = 258 class MyPro(Process): def run(self): print(‘%s子进程的主线程退出‘ % self.name) if __name__ == ‘__main__‘: m1 = MyPro() m1.start() print(m1.pid) # 查看子进程PID m1.terminate() # 关闭进程,但不会立马关闭,会稍微等一小会 print(m1.is_alive()) # 进程是否存活,存在返回True,否则,返回False time.sleep(1) print(m1.is_alive()) # 进程是否存活,存在返回True,否则,返回False print(‘主进程的主线程退出‘)
第三:守护进程
from multiprocessing import Process import time,random num = 258 class MyPro(Process): def run(self): time.sleep(1) print(‘%s子进程退出‘ % self.name) if __name__ == ‘__main__‘: m1 = MyPro() m1.daemon = True # 设置守护进程,要在start()之前设置。主进程执行退出,无论子进程执行到哪里,都跟着主进程退出 m1.start() print(‘主进程退出‘)
第四:进程同步
# 每个进程内部的数据是独立的,但是,当共同访问一个文件、数据库、打印机、其他设备等是会带来竞争的。
# 解决的办法是加锁,但这种做法会牺牲效率,并发执行变成了串行执行
如:3个进程,要排队上公共汽车,过程应该是,一个先上完车,下一个跟进
from multiprocessing import Process,Lock
import time
class MyPro(Process):
def run(self):
print(‘%s在排队上车‘ % self.name)
time.sleep(2)
print(‘%s上车完毕‘ % self.name)
if __name__ == ‘__main__‘:
pro_list = []
for i in range(3):
m = MyPro()
m.start()
# 结果:
MyPro-2在排队上车
MyPro-1在排队上车
MyPro-3在排队上车
MyPro-2上车完毕
MyPro-1上车完毕
MyPro-3上车完毕
#得到的结果是 三个同时挤上车
解决办法:
from multiprocessing import Process, Lock
import time
lock = Lock() # 生成一个锁对象
class MyPro(Process):
def __init__(self, lock):
super().__init__()
self.lock = lock
def run(self):
self.lock.acquire() # 谁拿到锁,就有执行权
print(‘%s正在排队上车‘ % self.name)
time.sleep(2)
print(‘%s已经上车‘ % self.name)
self.lock.release() # 释放锁,让其他进程去抢
if __name__ == ‘__main__‘:
for i in range(3):
m = MyPro(lock)
m.start()
from multiprocessing import Process, Lock import json, time class BuyTicket(Process): def __init__(self, lock): super().__init__() self.lock = lock def run(self): self.lock.acquire() # 谁先拿到,谁有执行权 dic = getattr(self, ‘get‘)() if dic[‘count‘] > 0: dic[‘count‘] -= 1 getattr(self, ‘put‘)(dic) print(‘买票成功‘) self.lock.release() # 释放锁 def get(self): with open(‘db.txt‘, ‘r‘) as f: dic = json.loads(f.read()) print(‘剩余票数:%s‘ % dic) return dic def put(self, dic): with open(‘db.txt‘, ‘w‘) as f: json.dump(dic, f) if __name__ == ‘__main__‘: lock = Lock() # 锁对象 for i in range(3): bt = BuyTicket(lock) bt.start()
第五:Queue(maxsize)
1.q.put(blocked,timeout): 将数据推入队列里。可选参数:blocked=True,timeout是正整数,该方法会阻塞到timeout的指定时间,阻塞到指定时间后,队列没有空间,抛出
queue.Full() 异常;blocked=False,队列满了就会抛queue.Full()异常
2.q.get(blocked, timeout): 从队列里取出一个元素并删除。可选参数:blocked=True,timeout是正整数,这等待时间内没有取到元素,抛出queue.Empty()异常。
blocked=False,队列为空,抛出queue.Empty()异常
3.q.get_nowait():同q.get(False)
4.q.put_nowait():同q.put(False)
熟悉的味道:生产者消费者
from multiprocessing import Process, Queue
import time
class Chef(Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
n = 5
while n > 0:
time.sleep(1)
print(‘厨师生产了编号[-%s-]的包子‘ % n)
self.queue.put(n)
n -= 1
class Foodie(Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
data = self.queue.get()
if data == None:
break
print(‘食客[-%s-]吃了编号[-%s-]的包子‘ % (self.name, data))
time.sleep(1)
if __name__ == ‘__main__‘:
q = Queue()
ch = Chef(q)
ch.start()
for i in range(3):
fd = Foodie(q)
fd.start()
ch.join() # 主线程等待生产者生产完成后才往下执行
for i in range(3): # 有多少个消费者就发送多少个None,接受到None的子进程就退出
q.put(None)
JoinableQueue(maxsize)
JoinableQueue继承了Queue,拥有Queue之外的方法:
1.q.task_done():向q.join()发送信号,表示数据已经被取走
2.q.join() :调用此方法,将会阻塞,直到 q.task_done()的调用
from multiprocessing import Process, Queue, JoinableQueue import time class Chef(Process): def __init__(self, joinableQueue): super().__init__() self.queue = joinableQueue def run(self): n = 5 while n > 0: time.sleep(1) print(‘厨师生产了编号[-%s-]的包子‘ % n) self.queue.put(n) n -= 1 self.queue.join() # 生产一个包子,就阻塞,直到消费者处理完队列里的所有数据(包子) class Foodie(Process): def __init__(self, joinableQueue): super().__init__() self.queue = joinableQueue def run(self): while True: print(‘等待包子‘) time.sleep(1) data = self.queue.get() if data is None: break print(‘食客[-%s-]吃了编号[-%s-]的包子‘ % (self.name, data)) self.queue.task_done() # 向join发送一个信号,队列里已经没东西了 if __name__ == ‘__main__‘: q = JoinableQueue() ch = Chef(q) ch.start() for i in range(3): fd = Foodie(q) fd.start() ch.join() # 生产者生产完成后,主线程执行下面代码,向子进程发送None,代表要子进程退出的信号 for i in range(3): q.put(None)
尽量使用消息队列进程并发编程,另外,应要避免共享数据的使用
第八:共享内存(了解,不推荐使用)
转自 : https://www.cnblogs.com/gengyi/p/8661235.html
第九:进程池
#在远程控制多主机或者同时操作一些系统管理的时候,并行可以节省时间。另外,如果任务过多,频繁的创建进程太麻烦和繁琐,任务少可以选择不用进程池。
# 参数
from multiprocessing import Pool
Pool(self, processes=None, initializer=None, initargs=(),maxtasksperchild=None, context=None)
processes: 指定创建进程数量
initializer: 每个进程启动时要执行的可调用对象
initargs: 进程的参数,元祖
# 方法
pool.apply(func, args=(), kwds={}) 同步执行,并返回结果
pool.apply_async(func, args=(), kwds={}, callback=None,error_callback=None)
‘‘‘
异步执行,CPU有多少个核,就能同时跑多少个任务,参数callback代表当前一次的任务执行完毕后,通知主进程调用一个函数(回调),另外,callback的函数不能有阻塞操作,因为会影响到其他异步任务的结果。
当进程池里的进程数量大于CPU核数和任务数量比进程池数大时,剩余的进程会切换着运行。如,进程池=5,CPU核数=4,任务数量=10,那么同时能跑4个任务,剩下的一个切换着跑,只是太快了感觉不出来
‘‘‘
pool.close() 关闭进程池
pool.join() 等待所有工作进程退出,另外,join必须要在close的前面
from multiprocessing import Pool import os,time def start(num): print(‘创建了PID为[-%s-]的进程‘ % os.getpid()) print(‘进程[-%s-]执行完毕‘ % os.getpid()) time.sleep(3) return num if __name__ == ‘__main__‘: pool = Pool(4) # 指定最大进程数 for i in range(100): # 创建10个任务 ‘‘‘ 同步执行,当本次任务执行完成并返回后, 无论其他子进程是否阻塞,它只能在原地等着, 等着上一个任务的执行完毕,即使任务发生了阻塞,也会被夺走执行权 ‘‘‘ result = pool.apply(func=start, args=(i,)) print(result)
from multiprocessing import Pool import os,time def start(num): print(‘创建了PID为[-%s-]的进程‘ % os.getpid()) print(‘进程[-%s-]执行完毕‘ % os.getpid()) time.sleep(3) return num if __name__ == ‘__main__‘: pool = Pool(4) # 指定最大进程数 result = [] for i in range(10): # 创建10个任务 res = pool.apply_async(func=start, args=(i,)) result.append(res) ‘‘‘ apply_async()不同于apply(),异步执行,主进程要使用join()等待,等所有任务执行完毕使用get()来收集, 如果不这么做,主进程结束,进程池没来的及执行,就跟着主进程一起结束了 ‘‘‘ pool.close() pool.join() for res in result: print(res.get()) # 获取结果
回调:
from multiprocessing import Pool import os,time def start(num): print(‘创建了PID为[-%s-]的进程‘ % os.getpid()) time.sleep(1) print(‘进程[-%s-]执行完毕‘ % os.getpid()) return num def end(arg): print(arg) # 打印每个任务的结果 print(‘结束了‘, os.getpid()) # 查看是哪个进程打印(其实是Python解释器的进程) if __name__ == ‘__main__‘: pool = Pool(2) # 指定最大进程数 result = [] for i in range(10): # 创建10个任务 ‘‘‘ 当一个任务执行完毕后,通知主进程调用end, 参数为当前任务的返回结果 ‘‘‘ res = pool.apply_async(func=start, args=(i,), callback=end) result.append(res) ‘‘‘ apply_async()不同于apply(),异步执行,主进程要使用join()等待,等所有任务执行完毕使用get()来收集, 如果不这么做,主进程结束,进程池没来的及执行,就跟着主进程一起结束了 ‘‘‘ pool.close() pool.join() for res in result: print(res.get()) # 获取结果,apply()可直接获取
soket + pool的简单应用
Server:
from socket import * from multiprocessing import Pool import os s = socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((‘127.0.0.1‘, 888)) s.listen(5) def Server_Pro(conn, addr): while True: data = conn.recv(1024) print(‘收到客户端发来的消息%s‘ % data) talk = ‘Server收到你发送过来的消息‘.encode(‘utf-8‘) conn.send(talk) if __name__ == ‘__main__‘: pool = Pool(3) # 能同时处理三个任务 while True: conn, addr = s.accept() pool.apply_async(Server_Pro,args=(conn, addr))
Client:
from socket import * c = socket(AF_INET, SOCK_STREAM) c.connect((‘127.0.0.1‘, 888)) while True: data = input(‘输入发送的消息:‘).encode(‘utf-8‘) c.send(data) msg = c.recv(1024) print(msg.decode(‘utf8‘))
以上是关于Python多进程的主要内容,如果未能解决你的问题,请参考以下文章