进程数据共享-进程池

Posted sandy-123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进程数据共享-进程池相关的知识,希望对你有一定的参考价值。

数据共享

Manager  内部管理了很多数据类型,并不是所有的数据类型都是用来做数据分享,只是顺便包含了能够处理数据共享问题的数据类型 list dict

列表/字典  自带的方法基本都是数据安全的,但是对其中的元素进行+= -= *=  /=   都是数据不安全的

from multiprocessing import Manager,Process,Lock

def func(dic,lock):
    with lock:
        dic[count] -= 1

if __name__ == __main__:
    m = Manager()
    lock = Lock()
    dic = m.dict({count:100})
    p_l = []
    for i in range(100):
        p = Process(target=func,args=(dic,lock))
        p.start()
        p_l.append(p)
    for p in p_l:p.join()
    print(dic)

 

数据共享:速度很慢,牵扯到锁的问题,一般使用数据库进行数据共享,利用第三方工具--消息中间件(消息队列)进行通信

进程池

在一台四核计算机上,如果有上百个没有IO阻塞的计算任务,开启上百个进程会使效率降低,可以利用进程池,开启几个进程,用过异步提交执行任务

异步提交

import time
from multiprocessing import Pool,Process

def func(i):
    i * i

if __name__ == __main__:
    start = time.time()
    p = Pool()    #括号里的参数默认是cpu的个数,开启cpu个数的进程
    for i in range(20):
        p.apply_async(func,(i,))   #apply提交任务,async异步
    p.close()           #关闭进程池,不允许再继续向这个池子中添加任务了
    p.join()            #阻塞  直到已经被提交到进程池中的任务全部结束
    print(time.time() - start)

 

获取返回值

import time
from multiprocessing import Pool

def func(i):
    i * i
    time.sleep(1)
    return i* i

if __name__ == __main__:
    p = Pool()
    ret_l = []
    for i in range(50):
        ret = p.apply_async(func,(i,))     
        ret_l.append(ret)    # ret接收返回值,并存放在列表里
    for ret in ret_l:
        print(ret.get())

 

map方法

import time
from multiprocessing import Pool

def func(i):
    i * i
    time.sleep(1)
    return i* i

if __name__ == __main__:
    p = Pool()
    ret_l = p.map(func,range(5))   #map就是一种简便的apply_async的方式,并且内置了close和join的功能,参数必须规定个数
    for ret in ret_l:
        print(ret)

 

同步提交

按照顺序一个一个执行,而且还有关于进程的开销,反而降低效率,不推荐使用

import os
import time
from multiprocessing import Pool

def func(i):
    time.sleep(1)
    print(i,os.getpid())

if __name__ == __main__:
    p = Pool()
    for i in range(20):
        p.apply(func,(i,))     # 同步提交

 

回调函数

# 利用网址得到网页信息
import os
from urllib import request
from multiprocessing import Pool
def parser_page(content):
    print(os.getpid())
    print(len : ,len(content))

def get_url(url):
    ret = request.urlopen(url)
    content = ret.read().decode(utf-8)
    return content

if __name__ == __main__:
    print(os.getpid())
    url_lst = [
        http://www.cnblogs.com/Eva-J/articles/8253549.html,   
        http://www.cnblogs.com/Eva-J/articles/8306047.html,   
        http://www.baidu.com,
        http://www.sogou.com,
        https://www.cnblogs.com/Eva-J/p/7277026.html
    ]
    p = Pool()  
    # ret_l = []
    for url in url_lst:
        ret = p.apply_async(get_url,(url,),callback=parser_page)  # 异步的方式提交任务
        # callback 将get_url的返回值给parser_page,并且立即执行parser_page
        # 如果异步的任务执行完毕之后需要立即做另外的操作,推荐使用collback和生产者消费者模型
        # ret_l.append(ret)
    p.close()              # 不获取结果就这么写
    p.join()
    # for ret in ret_l:    # 要获取结果就这么写
    #     res = ret.get()
    #     parser_page(res)

 

利用多进程实现并发的socketserver

# server端:
from socket import *
from multiprocessing import Pool

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((127.0.0.1,8080))
server.listen()

def talk(conn):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == __main__:
    p = Pool()
    while True:
        conn,addr = server.accept()
        p.apply_async(talk,args=(conn,))


# client端: import socket client = socket.socket() client.connect((127.0.0.1,8080)) while True: msg = input(>>>).strip() if not msg: continue client.send(msg.encode(utf-8)) msg = client.recv(1024) print(msg.decode(utf-8))

 

以上是关于进程数据共享-进程池的主要内容,如果未能解决你的问题,请参考以下文章

《Python》进程之间的通信(IPC)进程之间的数据共享进程池

进程数据共享-进程池

进程间的通信-队列/管道以及进程间的数据共享和进程池

生产者消费者模型,管道,进程之间共享内存,进程池

管道,进程间数据共享,进程池

python-- 多进程的基本语法 进程间数据交互与共享进程锁和进程池的使用