进程数据共享-进程池
Posted sandy-123
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进程数据共享-进程池相关的知识,希望对你有一定的参考价值。
数据共享
Manager 内部管理了很多数据类型,并不是所有的数据类型都是用来做数据分享,只是顺便包含了能够处理数据共享问题的数据类型 list dict
列表/字典 自带的方法基本都是数据安全的,但是对其中的元素进行+= -= *= /= 都是数据不安全的
from multiprocessing import Manager,Process,Lock def func(dic,lock): with lock: dic[‘count‘] -= 1 if __name__ == ‘__main__‘: m = Manager() lock = Lock() dic = m.dict({‘count‘:100}) p_l = [] for i in range(100): p = Process(target=func,args=(dic,lock)) p.start() p_l.append(p) for p in p_l:p.join() print(dic)
数据共享:速度很慢,牵扯到锁的问题,一般使用数据库进行数据共享,利用第三方工具--消息中间件(消息队列)进行通信
进程池
在一台四核计算机上,如果有上百个没有IO阻塞的计算任务,开启上百个进程会使效率降低,可以利用进程池,开启几个进程,用过异步提交执行任务
异步提交
import time from multiprocessing import Pool,Process def func(i): i * i if __name__ == ‘__main__‘: start = time.time() p = Pool() #括号里的参数默认是cpu的个数,开启cpu个数的进程 for i in range(20): p.apply_async(func,(i,)) #apply提交任务,async异步 p.close() #关闭进程池,不允许再继续向这个池子中添加任务了 p.join() #阻塞 直到已经被提交到进程池中的任务全部结束 print(time.time() - start)
获取返回值
import time from multiprocessing import Pool def func(i): i * i time.sleep(1) return ‘i‘* i if __name__ == ‘__main__‘: p = Pool() ret_l = [] for i in range(50): ret = p.apply_async(func,(i,)) ret_l.append(ret) # ret接收返回值,并存放在列表里 for ret in ret_l: print(ret.get())
map方法
import time from multiprocessing import Pool def func(i): i * i time.sleep(1) return ‘i‘* i if __name__ == ‘__main__‘: p = Pool() ret_l = p.map(func,range(5)) #map就是一种简便的apply_async的方式,并且内置了close和join的功能,参数必须规定个数 for ret in ret_l: print(ret)
同步提交
按照顺序一个一个执行,而且还有关于进程的开销,反而降低效率,不推荐使用
import os import time from multiprocessing import Pool def func(i): time.sleep(1) print(i,os.getpid()) if __name__ == ‘__main__‘: p = Pool() for i in range(20): p.apply(func,(i,)) # 同步提交
回调函数
# 利用网址得到网页信息 import os from urllib import request from multiprocessing import Pool def parser_page(content): print(os.getpid()) print(‘len : ‘,len(content)) def get_url(url): ret = request.urlopen(url) content = ret.read().decode(‘utf-8‘) return content if __name__ == ‘__main__‘: print(os.getpid()) url_lst = [ ‘http://www.cnblogs.com/Eva-J/articles/8253549.html‘, ‘http://www.cnblogs.com/Eva-J/articles/8306047.html‘, ‘http://www.baidu.com‘, ‘http://www.sogou.com‘, ‘https://www.cnblogs.com/Eva-J/p/7277026.html‘ ] p = Pool() # ret_l = [] for url in url_lst: ret = p.apply_async(get_url,(url,),callback=parser_page) # 异步的方式提交任务 # callback 将get_url的返回值给parser_page,并且立即执行parser_page # 如果异步的任务执行完毕之后需要立即做另外的操作,推荐使用collback和生产者消费者模型 # ret_l.append(ret) p.close() # 不获取结果就这么写 p.join() # for ret in ret_l: # 要获取结果就这么写 # res = ret.get() # parser_page(res)
利用多进程实现并发的socketserver
# server端: from socket import * from multiprocessing import Pool server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind((‘127.0.0.1‘,8080)) server.listen() def talk(conn): while True: try: msg = conn.recv(1024) if not msg: break conn.send(msg.upper()) except Exception: break if __name__ == ‘__main__‘: p = Pool() while True: conn,addr = server.accept() p.apply_async(talk,args=(conn,))
# client端: import socket client = socket.socket() client.connect((‘127.0.0.1‘,8080)) while True: msg = input(‘>>>‘).strip() if not msg: continue client.send(msg.encode(‘utf-8‘)) msg = client.recv(1024) print(msg.decode(‘utf-8‘))
以上是关于进程数据共享-进程池的主要内容,如果未能解决你的问题,请参考以下文章