Python 请求 - 线程/进程与 IO

Posted

技术标签:

【中文标题】Python 请求 - 线程/进程与 IO【英文标题】:Python requests - threads/processes vs. IO 【发布时间】:2016-06-15 07:21:46 【问题描述】:

我正在通过 HTTP 连接到本地服务器 (OSRM) 以提交路由并取回行驶时间。我注意到 I/O 比线程慢,因为计算的等待时间似乎小于发送请求和处理 JSON 输出所需的时间(我认为当服务器需要一些时间来处理处理您的请求->您不希望它被阻塞,因为您必须等待,这不是我的情况)。线程受到全局解释器锁的影响,因此看起来(以及下面的证据)我最快的选择是使用多处理。

多处理的问题是它太快了,以至于它耗尽了我的套接字并且我得到一个错误(请求每次都发出一个新连接)。我可以(串行)使用 requests.Sessions() 对象来保持连接处于活动状态,但是我不能让它并行工作(每个进程都有自己的会话)。

我目前最接近的代码是这个多处理代码:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())

def ReqOsrm(url_input):
    ul, qid = url_input      
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

但是,我无法让 HTTPConnectionPool 正常工作,它每次都会创建新的套接字(我认为)然后给我错误:

HTTPConnectionPool(host='127.0.0.1', port=5005): 超过最大重试次数 带网址: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (NewConnectionError(': 建立新连接失败: [WinError 10048] 每个套接字地址只能使用一次 (协议/网络地址/端口)通常是允许的',))


我的目标是从我在本地运行的OSRM-routing server 获取距离计算(尽快)。

我有两个部分的问题 - 基本上我正在尝试使用 multiprocessing.Pool() 将一些代码转换为更好的代码(适当的异步函数 - 以便执行永远不会中断并且运行速度尽可能快)。

我遇到的问题是我尝试的一切似乎都比多处理慢(我在下面提供了几个我尝试过的示例)。

一些潜在的方法是:gevents、grequests、tornado、requests-futures、asyncio 等。

A - Multiprocessing.Pool()

我最初是这样开始的:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

我在哪里连接到本地服务器 (localhost,port:5005),它在 8 个线程和 supports parallel execution 上启动。

经过一番搜索,我意识到我得到的错误是因为请求是opening a new connection/socket for each-request。所以这实际上太快了,一段时间后会耗尽套接字。解决这个问题的方法似乎是使用 requests.Session() - 但是我无法使用多处理(每个进程都有自己的会话)来解决这个问题。

问题 1。

在某些计算机上运行良好,例如:

以后比较:45% 的服务器使用率和每秒 1700 个请求

但是,在某些情况下它没有,我不完全理解为什么:

HTTPConnectionPool(host='127.0.0.1', port=5000): 超过最大重试次数 带网址: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (由 NewConnectionError(': 建立新连接失败: [WinError 10048] 每个套接字地址只能使用一次 (协议/网络地址/端口)通常是允许的',))

我的猜测是,因为 requests 在使用时会锁定套接字 - 有时服务器太慢而无法响应旧请求并生成新请求。服务器支持排队,但是请求不支持,而不是添加到队列中我得到错误?

问题 2。

我找到了:

阻塞还是非阻塞?

使用默认传输适配器,Requests 不提供 任何类型的非阻塞 IO。 Response.content 属性将阻塞 直到整个响应被下载。如果您需要更多 粒度,库的流式传输功能(请参阅流式传输 请求)允许您在以下位置检索更少量的响应 一次。但是,这些调用仍然会阻塞。

如果您担心阻塞 IO 的使用,有很多 将请求与 Python 之一结合起来的项目 异步框架。

两个很好的例子是 grequests 和 requests-futures。

B - 请求-期货

为了解决这个问题,我需要重写我的代码以使用异步请求,所以我尝试了以下使用:

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed

(顺便说一下,我用使用所有线程的选项启动我的服务器)

以及主要代码:

calc_routes = []
futures = 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        calc_routes.append(row)

我的函数 (ReqOsrm) 现在改写为:

def ReqOsrm(sess, resp):
    json_geocode = resp.json()
    status = int(json_geocode['status'])
    # Found route between points
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    # Cannot find route between points (code errors as 999)
    else:
        out = [999, 0, 0, 0, 0, 0, 0]
    resp.data = out

但是,此代码比多处理代码!在我每秒收到大约 1700 个请求之前,现在我得到了 600 秒。我猜这是因为我没有充分的 CPU 利用率,但是我不知道如何增加它?

C - 线程

我尝试了另一种方法 (creating threads) - 但是再次不确定如何获得此方法以最大限度地提高 CPU 使用率(理想情况下,我希望我的服务器使用 50%,不是吗?):

def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()

def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None

def processReq(status, resp, qid):
    try:
        json_geocode = resp.json()
        # Found route between points
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from = json_geocode['via_points'][0]
            used_to = json_geocode['via_points'][1]
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        else:
            print("Done but no route")
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("Error: %s" % err)
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    qres.put(out)
    return

#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)

for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in url_routes:
        q.put(url)
        q.join()
    except Exception:
        pass

# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

这个方法比我认为的 requests_futures 快,但我不知道要设置多少线程来最大化这个 -

D - 龙卷风(不工作)

我现在正在尝试龙卷风 - 但是如果我使用 curl,它无法完全让它工作,它会与现有代码 -1073741819 中断 - 如果我使用 simple_httpclient 它可以工作,但是我会收到超时错误:

错误:tornado.application:产量列表中的多个异常 Traceback (最近一次通话最后):文件 “C:\Anaconda3\lib\site-packages\tornado\gen.py”,第 789 行,在回调中 result_list.append(f.result()) 文件“C:\Anaconda3\lib\site-packages\tornado\concurrent.py”,第 232 行,在 结果 raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout

def handle_req(r):
    try:
        json_geocode = json_decode(r)
        status = int(json_geocode['status'])
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        print(out)
    except Exception as err:
        print(err)
        out = [999, 0, 0, 0, 0, 0, 0]
    return out

# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)

@gen.coroutine
def run_experiment(urls):
    http_client = AsyncHTTPClient()
    responses = yield [http_client.fetch(url) for url, qid in urls]
    responses_out = [handle_req(r.body) for r in responses]
    raise gen.Return(value=responses_out)

# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)

E - asyncio / aiohttp

决定尝试使用 asyncio 和 aiohttp 的另一种方法(尽管让 tornado 工作会很棒)。

import asyncio
import aiohttp

def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    status = int(json_geocode['status'])
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    else:
        print("Done, but not route for 0 - status: 1".format(qid, status))
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    return out

def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from aiohttp.request('GET', url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
        return body, qid
    return http_get

def run_experiment(urls):
    http_client = chunked_http_client(500)
    # http_client returns futures
    # save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for 0 - 1".format(qid,err))
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
        response.append(out)
    return response

# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

这工作正常,但仍然比多处理慢!

【问题讨论】:

另一种方法是使用事件循环,而不是尝试使用最佳线程池大小。您可以使用回调注册请求,并在返回响应时等待事件循环处理 @dm03514 谢谢!但是,这不是我在做 requests-futures 示例时所拥有的吗? future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp)) 我从未使用过RequestFuture,但我认为它仍然委托给线程池,事件循环应该是一个新的请求模型,并且只会暴露一个线程,所以你不要'不必担心要配置多少线程来工作:) python 在 stdlibrary pypi.python.org/pypi/aiohttp 中有一个,我从未使用过,但看起来相对简单,tornado 是一个基于 os 事件库的框架,它具有简单的 api。 tornadokevinlee.readthedocs.org/en/latest/httpclient.html @dm03514 我尝试使用 aiohttp,它做得相当好(比 requests-futures 更好)。但是,速度仍然比使用多处理要慢 - 除非我执行不正确并遇到瓶颈 @mptevsion 我正在尝试做同样的事情。你能告诉我当你说“......连接到在 8 个线程上启动并支持并行执行的本地服务器 (localhost,port:5005)”时你的意思吗?你如何在 8 个线程上午餐 OSRM 服务器?您如何支持并行执行?最后一个问题:你能给我一个 url_routes 的例子吗? qid 是什么? 【参考方案1】:

感谢大家的帮助。我想发表我的结论:

由于我的 HTTP 请求是针对本地服务器的,该服务器会立即处理请求,因此使用异步方法对我来说没有多大意义(与通过 Internet 发送请求的大多数情况相比)。对我来说,代价高昂的因素实际上是发送请求和处理反馈,这意味着我使用多个进程可以获得更好的速度(线程受到 GIL 的影响)。我还应该使用会话来提高速度(无需关闭和重新打开与 SAME 服务器的连接)并帮助防止端口耗尽。

以下是使用示例 RPS 尝试(工作)的所有方法:

连续剧

S1。串行 GET 请求(无会话)-> 215 RPS

def ReqOsrm(data):
    url, qid = data
    try:
        response = requests.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S2。串行 GET 请求 (requests.Session()) -> 335 RPS

session = requests.Session()
def ReqOsrm(data):
    url, qid = data
    try:
        response = session.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S3。串行 GET 请求 (urllib3.HTTPConnectionPool) -> 545 RPS

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

异步 IO

A4。 AsyncIO 与 aiohttp -> 450 RPS

import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    tot_time_s = json_geocode['paths'][0]['time']
    tot_dist_m = json_geocode['paths'][0]['distance']
    return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            with aiohttp.ClientSession() as session:
                response = yield from session.get(url)
                body = yield from response.content.read()
                yield from response.wait_for_close()
        return body, qid
    return http_get
def run_experiment(urls):
    http_client = chunked_http_client(num_chunks=concurrent)
    # http_client returns futures, save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for 0 - 1".format(qid,err))
            out = [qid, 999, 0, 0]
        response.append(out)
    return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

A5。没有会话的线程 -> 330 RPS

from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A6。使用 HTTPConnectionPool 进行线程处理 -> 1550 RPS

from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = conn_pool.request('GET', url)
        return resp.status, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A7。请求-期货-> 520 RPS

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
    try:
        json_geocode = resp.json()
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err)
        out = [999, 0, 0]
    resp.data = out
#Run:
calc_routes = []
futures = 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0]
        calc_routes.append(row)

多个进程

P8。 multiprocessing.worker + queue + requests.session() -> 1058 RPS

from multiprocessing import *
class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout
    def run(self):
        s = requests.session()
        while not self.qin.empty():
            url, qid = self.qin.get()
            data = s.get(url)
            self.qout.put(ReqOsrm(data, qid))
            self.qin.task_done()
def ReqOsrm(resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid)
        return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
    calc_routes.append(qout.get())

P9。 multiprocessing.worker + queue + HTTPConnectionPool() -> 1230 RPS

P10。多处理 v2(不确定这有何不同)-> 1350 RPS

conn_pool = None
def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)

总而言之,对我来说最好的方法似乎是#10(令人惊讶的是#6)

【讨论】:

您可以尝试的另一种方法是将多处理与 asyncio(或 gevent)一起使用。我只使用过 gevent,但由于是单线程协程,它只能利用单核。协程切换应该比线程快,所以多处理 + 协程可能是最快的。 你要选择一个答案吗? 运行 P8 时出现错误:ChunkedEncodingError(ProtocolError('Connection broken: IncompleteRead(162 bytes read)', IncompleteRead(162 bytes read))【参考方案2】:

查看问题顶部的多处理代码。似乎每次调用 ReqOsrm 时都会调用 HttpConnectionPool()。因此,为每个 url 创建一个新池。而是使用initializerargs 参数为每​​个进程创建一个池。

conn_pool = None

def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)

def ReqOsrm(url_input):
    ul, qid = url_input

    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out

        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]

    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

if __name__ == "__main__":
    # run:
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
    calc_routes = pool.map(ReqOsrm, url_routes)
    pool.close()
    pool.join()

request-futures 版本似乎有缩进错误。循环 for future in as_completed(futures): 在外循环下缩进 for i in range(len(url_routes)):。因此,在外循环中发出请求,然后内循环等待该未来在外循环的下一次迭代之前返回。这使得请求串行而不是并行运行。

我认为代码应该如下:

calc_routes = []
futures = 
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit all the requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid

    # this was indented under the code in section B of the question
    # process the futures as they become copmlete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data

        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        print(row)
        calc_routes.append(row)

【讨论】:

【参考方案3】:

问题 1

你得到错误,因为这种方法:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

为每个请求的 URL 创建一个新的 TCP 连接,在某些时候它会因为系统没有可用的本地端口而失败。要确认您可以在代码执行时运行netstat

netstat -a -n | find /c "localhost:5005"

这将为您提供到服务器的许多连接。

此外,这种方法达到 1700 RPS 看起来很不现实,因为 requests.get 是相当昂贵的操作,而且您不太可能通过这种方式获得 50 RPS。因此,您可能需要仔细检查您的 RPS 计算。

为避免错误,您需要使用会话而不是从头开始创建连接:

import multiprocessing
import requests
import time


class Worker(multiprocessing.Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout

    def run(self):
        s = requests.session()
        while not self.qin.empty():
            result = s.get(self.qin.get())
            self.qout.put(result)
            self.qin.task_done()

if __name__ == '__main__':
    start = time.time()

    qin = multiprocessing.JoinableQueue()
    [qin.put('http://localhost:8080/') for _ in range(10000)]

    qout = multiprocessing.Queue()

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())]

    qin.join()

    result = []
    while not qout.empty():
        result.append(qout.get())

    print time.time() - start
    print result

问题 2

使用线程或异步方法不会获得更高的 RPS,除非 I/O 花费的时间比计算时间长(例如高网络延迟、大响应等),因为线程会受到 GIL 的影响,因为在同一个 Python 进程中运行并且异步库可能会被长时间运行的计算阻塞。

虽然线程或异步库可以提高性能,但在多个进程中运行相同的线程或异步代码无论如何都会为您提供更高的性能。

【讨论】:

【参考方案4】:

这是我在 gevent 中使用的一种模式,它是基于协程的,可能不受 GIL 的影响。这可能比使用线程更快,并且在与多处理结合使用时可能最快(目前它只使用 1 个核心):

from gevent import monkey
monkey.patch_all()

import logging
import random
import time
from threading import Thread

from gevent.queue import JoinableQueue
from logger import initialize_logger

initialize_logger()
log = logging.getLogger(__name__)


class Worker(Thread):

    def __init__(self, worker_idx, queue):
        # initialize the base class
        super(Worker, self).__init__()
        self.worker_idx = worker_idx
        self.queue = queue

    def log(self, msg):
        log.info("WORKER %s - %s" % (self.worker_idx, msg))

    def do_work(self, line):
        #self.log(line)
        time.sleep(random.random() / 10)

    def run(self):
        while True:
            line = self.queue.get()
            self.do_work(line)
            self.queue.task_done()


def main(number_of_workers=20):
    start_time = time.time()

    queue = JoinableQueue()
    for idx in range(number_of_workers):
        worker = Worker(idx, queue)
        # "daemonize" a thread to ensure that the threads will
        # close when the main program finishes
        worker.daemon = True
        worker.start()

    for idx in xrange(100):
        queue.put("%s" % idx)

    queue.join()
    time_taken = time.time() - start_time
    log.info("Parallel work took %s seconds." % time_taken)

    start_time = time.time()
    for idx in xrange(100):
        #log.info(idx)
        time.sleep(random.random() / 10)
    time_taken = time.time() - start_time
    log.info("Sync work took %s seconds." % time_taken)


if __name__ == "__main__":
    main()

【讨论】:

以上是关于Python 请求 - 线程/进程与 IO的主要内容,如果未能解决你的问题,请参考以下文章

python 进程和线程

Python进程池和线程池

进程线程

python---爬虫相关性能(各个异步模块的使用,和自定义异步IO模块)

python socket多线程和多进程

多线程,多进程和异步IO