在 Python 中发送 100,000 个 HTTP 请求的最快方法是啥?

Posted

技术标签:

【中文标题】在 Python 中发送 100,000 个 HTTP 请求的最快方法是啥?【英文标题】:What is the fastest way to send 100,000 HTTP requests in Python?在 Python 中发送 100,000 个 HTTP 请求的最快方法是什么? 【发布时间】:2011-02-07 14:51:00 【问题描述】:

我正在打开一个包含 100,000 个 URL 的文件。我需要向每个 URL 发送一个 HTTP 请求并打印状态代码。我使用的是 Python 2.6,到目前为止,我研究了 Python 实现线程/并发的许多令人困惑的方式。我什至看过 python concurrence 库,但不知道如何正确编写这个程序。有没有人遇到过类似的问题?我想通常我需要知道如何在 Python 中尽可能快地执行数千个任务 - 我想这意味着“并发”。

【问题讨论】:

确保您只执行 HEAD 请求(这样您就不会下载整个文档)。见:***.com/questions/107405/… 好点,卡尔米。如果 Igor 想要的只是请求的状态,那么这 10 万个请求会变得非常非常快。更快。 你不需要线程;最有效的方法可能是使用像 Twisted 这样的异步库。 这里是gevent, twisted, and asyncio -based code examples(在 1000000 个请求上测试) @TarnayKálmán requests.getrequests.head(即页面请求与头部请求)可能返回不同的状态代码,所以这不是最好的建议 【参考方案1】:

无扭曲解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这个比扭曲的解决方案稍微快一点,并且使用更少的 CPU。

【讨论】:

@Kalmi,你为什么将队列设置为concurrent*2 别忘了close the connectionconn.close()。打开过多的 http 连接可能会在某些时候停止您的脚本并占用内存。 @hyh,Queue 模块在 Python 3 中已重命名为 queue。这是 Python 2 代码。 如果您想通过保持连接每次都与同一台服务器通信,您能快多少?这甚至可以跨线程完成,或者每个线程使用一个持久连接吗? @mptevsion,如果您使用的是 CPython,您可以(例如)将“print status, url”替换为“my_global_list.append((status, url))”。由于 GIL,(大多数操作)列表在 CPython(和其他一些 python 实现)中是隐式线程安全的,所以这样做是安全的。【参考方案2】:

自 2010 年发布此内容以来,情况发生了很大变化,我还没有尝试所有其他答案,但我尝试了一些,我发现使用 python3.6 最适合我。

我能够在 AWS 上每秒获取大约 150 个唯一域。

import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took time2-time1:.2f s')

【讨论】:

我只是问,因为我不知道,但这些期货的东西可以用 async/await 代替吗? 可以,但我发现上述方法效果更好。您可以使用 aiohttp,但它不是标准库的一部分,并且变化很大。它确实有效,但我只是没有发现它也有效。当我使用它时,我得到更高的错误率,在我的一生中,我无法让它像并发期货一样工作,尽管理论上它似乎应该工作得更好,请参阅:***.com/questions/45800857/… 如果你让它工作好吧,请发布您的答案,以便我进行测试。 这是一个挑剔的问题,但我认为将 time1 = time.time() 放在 for 循环的顶部和 time2 = time.time() 放在 for 循环之后会更干净。 我测试了你的 sn-p,不知何故它执行了两次。难道我做错了什么?还是意味着要运行两次?如果是后一种情况,你能不能帮我理解一下它是如何触发两次的? 它不应该运行两次。不知道您为什么会看到这种情况。【参考方案3】:

使用tornado异步网络库的解决方案

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

此代码使用非阻塞网络 I/O,没有任何限制。它可以扩展到数万个打开的连接。它将在单个线程中运行,但比任何线程解决方案都要快。结帐non-blocking I/O

【讨论】:

你能解释一下全局 i 变量发生了什么吗?某种错误检查? 这是一个计数器,用于确定何时退出 ``ioloop` -- 所以当你完成时。 @mher - 如果我对响应完全不感兴趣,这意味着只希望尽可能快地向服务器发送尽可能多的请求,我应该在上面的例子?谢谢!! @Guy Avraham 祝你在 ddos​​ 计划上获得帮助。 @Walter - 你找到我了 :) 实际上我是在尝试做一些非常幼稚的“压力测试”【参考方案4】:

我知道这是一个老问题,但在 Python 3.7 中,您可以使用 asyncioaiohttp 来做到这一点。

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'result[1] - str(result[0])')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

您可以阅读更多相关信息并查看示例 here。

【讨论】:

这类似于 C# async/await 和 Kotlin 协程吗? @IgorGanapolsky,是的,它与 C# async/await 非常相似。我对 Kotlin 协程不熟悉。 @sandyp,我不确定它是否有效,但如果你想尝试,你将不得不使用 UnixConnector for aiohttp。在此处阅读更多信息:docs.aiohttp.org/en/stable/client_reference.html#connectors。 谢谢@MariusStănescu。这正是我使用的。 +1 用于显示 asyncio.gather(*tasks)。这是我使用的一个这样的 sn-p:urls= [fetch(construct_fetch_url(u),idx) for idx, u in enumerate(some_URI_list)]results = await asyncio.gather(*urls)【参考方案5】:

线程绝对不是这里的答案。如果总体目标是“最快的方式”,它们将提供进程和内核瓶颈,以及不可接受的吞吐量限制。

一点点twisted 和它的异步HTTP 客户端会给你带来更好的结果。

【讨论】:

ironfroggy:我倾向于你的观点。我尝试使用线程和队列(用于自动互斥锁)来实现我的解决方案,但是你能想象用 100,000 个东西填充队列需要多长时间吗?我仍在尝试这个线程上每个人的不同选项和建议,也许 Twisted 会是一个很好的解决方案。 你可以避免用 100k 的东西填充队列。只需从您的输入中一次处理一个项目,然后启动一个线程来处理与每个项目对应的请求。 (正如我在下面描述的,当您的线程数低于某个阈值时,使用启动器线程启动 HTTP 请求线程。让线程将结果写入到 dict 映射 URL 以响应,或将元组附加到列表。)跨度> ironfroggy:另外,我很好奇您在使用 Python 线程时发现了哪些瓶颈? Python 线程如何与 OS 内核交互? 确保安装了epoll reactor;否则你将使用 select/poll,它会很慢。此外,如果您实际上要尝试同时打开 100,000 个连接(假设您的程序是这样编写的,并且 URL 位于不同的服务器上),您需要调整您的操作系统,这样您就不会用完文件描述符、临时端口等(确保一次没有超过 10,000 个未完成的连接可能更容易)。 erikg:您确实推荐了一个好主意。然而,我能够用 200 个线程实现的最佳结果大约是。 6 分钟。我确信有一些方法可以在更短的时间内完成此任务... Mark N:如果 Twisted 是我决定采用的方式,那么 epoll reactor 肯定是有用的。但是,如果我的脚本将在多台机器上运行,那是否需要在每台机器上安装 Twisted?我不知道我能不能说服我的老板走那条路……【参考方案6】:

使用grequests,它是请求+Gevent模块的组合。

GRequests 允许您将 Requests 与 Gevent 一起使用来轻松地发出异步 HTTP 请求。

用法很简单:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)

同时发送它们:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

【讨论】:

gevent 现在支持 python 3 grequests 不是正常请求的一部分,似乎基本上无人维护【参考方案7】:

(下一个项目的自我说明)

仅使用 requests 的 Python 3 解决方案。它最简单,速度也很快,不需要多处理或复杂的异步库。

最重要的方面是重用连接,尤其是对于 HTTPS(TLS 需要额外的往返才能打开)。请注意,连接特定于子域​​。如果您在许多域上抓取许多页面,则可以对 URL 列表进行排序以最大限度地重复使用连接(它有效地按域排序)。

当给定足够多的线程时,它将与任何异步代码一样快。 (请求在等待响应时释放 python GIL)。

[带有一些日志记录和错误处理的生产级代码]

import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# source: https://***.com/a/68583332/5994461

THREAD_POOL = 16

# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
    'https://',
    requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
                                  max_retries=3,
                                  pool_block=True)
)

def get(url):
    response = session.get(url)
    logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
    if response.status_code != 200:
        logging.error("request failed, error code %s [%s]", response.status_code, response.url)
    if 500 <= response.status_code < 600:
        # server is overloaded? give it a break
        time.sleep(5)
    return response

def download(urls):
    with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
        # wrap in a list() to wait for all requests to complete
        for response in list(executor.map(get, urls)):
            if response.status_code == 200:
                print(response.content)

def main():
    logging.basicConfig(
        format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/404",
        "https://httpstat.us/503"
    ]

    download(urls)

if __name__ == "__main__":
    main()

【讨论】:

你所说的~"排序网址"是什么意思? 对URL列表排序sorted(urls)【参考方案8】:

解决此问题的一个好方法是首先编写获得一个结果所需的代码,然后合并线程代码以并行化应用程序。

在一个完美的世界中,这仅仅意味着同时启动 100,000 个线程,这些线程将它们的结果输出到字典或列表中以供以后处理,但实际上,您可以以这种方式发出多少并行 HTTP 请求是有限的。在本地,您可以同时打开多少个套接字,您的 Python 解释器将允许多少个执行线程。如果所有请求都针对一台或多台服务器,则远程连接的数量可能会受到限制。这些限制可能需要您编写脚本,以便在任何时候只轮询一小部分 URL(正如另一位海报提到的,100 可能是一个不错的线程池大小,尽管您可能会发现您可以成功部署更多)。

您可以按照此设计模式解决上述问题:

    启动一个线程,该线程启动新的请求线程,直到当前运行的线程数(您可以通过 threading.active_count() 或通过将线程对象推入数据结构来跟踪它们)>= 您的最大同时请求数(说 100),然后休眠一小段时间。当没有更多要处理的 URL 时,该线程应该终止。因此,线程将不断唤醒、启动新线程并休眠,直到您完成。 让请求线程将其结果存储在某个数据结构中,以供以后检索和输出。如果您存储结果的结构是 CPython 中的 listdict,您可以使用 safely append or insert unique items from your threads without locks,但如果您写入文件或需要更复杂的跨线程数据交互您应该使用一个互斥锁来保护这个状态免受破坏

我建议您使用threading 模块。您可以使用它来启动和跟踪正在运行的线程。 Python 的线程支持是裸露的,但您对问题的描述表明它完全可以满足您的需求。

最后,如果您希望看到一个用 Python 编写的并行网络应用程序的非常简单的应用程序,请查看ssh.py。这是一个使用 Python 线程并行化许多 SSH 连接的小型库。该设计非常接近您的要求,您可能会发现它是一个很好的资源。

【讨论】:

erikg:将队列放入您的等式是否合理(用于互斥锁定)?我怀疑 Python 的 GIL 不适合玩数千个线程。 为什么需要互斥锁来防止产生过多线程?我怀疑我误解了这个词。您可以跟踪线程队列中正在运行的线程,在它们完成时将它们删除,并添加更多到所述线程限制。但是在一个简单的情况下,例如有问题的情况,您也可以只观察当前 Python 进程中的活动线程数,等到它低于阈值,然后启动更多线程,直到达到所描述的阈值。我想你可以认为这是一个隐式锁,但 afaik 不需要显式锁。 erikg:多个线程不共享状态吗?在 O'Reilly 的“Python for Unix and Linux System Administration”一书中的第 305 页上,它指出:“......使用没有队列的线程会使它比许多人实际处理的更复杂。总是使用队列是一个更好的主意"再次,我欢迎您对此发表看法。 Igor:您应该使用锁是绝对正确的。我已经编辑了帖子以反映这一点。也就是说,使用 python 的实际经验表明,您不需要锁定从线程中原子修改的数据结构,例如通过 list.append 或通过添加哈希键。我相信原因是 GIL,它提供了诸如 list.append 之类的操作,具有一定程度的原子性。我目前正在运行测试以验证这一点(使用 10k 线程将数字 0-9999 附加到列表中,检查所有附加是否有效)。经过近 100 次迭代,测试没有失败。 Igor:有人问我关于这个主题的另一个问题:***.com/questions/2740435/…【参考方案9】:

如果您希望获得可能的最佳性能,您可能需要考虑使用异步 I/O 而不是线程。与数千个 OS 线程相关的开销是不小的,Python 解释器中的上下文切换在它之上增加了更多。线程肯定会完成工作,但我怀疑异步路由会提供更好的整体性能。

具体来说,我建议使用 Twisted 库 (http://www.twistedmatrix.com) 中的异步 Web 客户端。众所周知,它有一个陡峭的学习曲线,但一旦你掌握了 Twisted 的异步编程风格,它就很容易使用。

关于 Twisted 的异步 Web 客户端 API 的 HowTo 可在以下位置获得:

http://twistedmatrix.com/documents/current/web/howto/client.html

【讨论】:

Rakis:我目前正在研究异步和非阻塞 I/O。在我实施它之前,我需要更好地学习它。我想对您的帖子发表的一条评论是(至少在我的 Linux 发行版下)不可能产生“数千个操作系统线程”。 Python 将允许您在程序中断之前生成的最大线程数。在我的情况下(在 CentOS 5 上)最大线程数是 303。 很高兴知道这一点。我从来没有尝试过在 Python 中一次生成多个,但我希望能够在它被炸毁之前创建更多。【参考方案10】:

解决方案:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

测试时间:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Ping 时间:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

【讨论】:

使用 Twisted 作为线程池忽略了您可以从中获得的大部分好处。您应该改用异步 HTTP 客户端。【参考方案11】:

使用thread pool 是一个不错的选择,并且会使这变得相当容易。不幸的是,python 没有使线程池变得非常简单的标准库。但这里有一个不错的库,可以帮助您入门: http://www.chrisarndt.de/projects/threadpool/

来自他们网站的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这会有所帮助。

【讨论】:

我建议您像这样为 ThreadPool 指定 q_size: ThreadPool(poolsize, q_size=1000) 这样您的内存中就不会有 100000 个 WorkRequest 对象。 “如果q_size>0 工作请求队列的大小是有限的,当队列满时线程池阻塞并尝试在其中放入更多工作请求(参见putRequest方法) ,除非您还为 putRequest 使用了正的 timeout 值。" 到目前为止,我正在尝试实施线程池解决方案 - 正如建议的那样。但是,我不明白 makeRequests 函数中的参数列表。什么是 some_callable、list_of_args、回调?也许如果我看到一个真正的代码 sn-p 会有所帮助。我很惊讶该库的作者没有发布任何示例。 some_callable 是您完成所有工作的函数(连接到 http 服务器)。 list_of_args 是将传递给 some_callabe 的参数。 callback 是一个在工作线程完成时将被调用的函数。它有两个参数,worker 对象(实际上不需要关心你自己),以及 worker 检索到的结果。【参考方案12】:

这个扭曲的异步网络客户端运行得非常快。

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = 

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

【讨论】:

【参考方案13】:

创建epoll对象, 打开多个客户端 TCP 套接字, 将它们的发送缓冲区调整为比请求标头多一点, 发送一个请求头——它应该是立即的,只是放入一个缓冲区, 在epoll 对象中注册套接字, 在epoll 上做.poll 对象, 从.poll 读取每个套接字的前 3 个字节, 将它们写到sys.stdout,然后是\n(不要刷新), 关闭客户端套接字。

限制同时打开的套接字数量——在创建套接字时处理错误。仅当另一个套接字关闭时才创建一个新套接字。 调整操作系统限制。 尝试分叉几个(不是很多)进程:这可能有助于更有效地使用 CPU。

【讨论】:

@IgorGanapolsky 一定是。否则我会感到惊讶。但它肯定需要试验。【参考方案14】:

我发现使用tornado 包是实现这一目标的最快和最简单的方法:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: e')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: e')
                continue
            else:
                print(f'URL \'response.request.url\' has status code <response.code>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])

【讨论】:

我的上帝终于非常感谢,我被卡在使用糟糕的并发期货,并且由于某种原因在我的环境中它一直卡在一些 url 调用的无限循环中,我 100% 确定我m 正确使用它。龙卷风的这种解决方案是一流的【参考方案15】:
pip install requests-threads

使用 async/await 的示例用法——发送 100 个并发请求

from requests_threads import AsyncSession

session = AsyncSession(n=100)

async def _main():
    rs = []
    for _ in range(100):
        rs.append(await session.get('http://httpbin.org/get'))
    print(rs)

if __name__ == '__main__':
    session.run(_main)

此示例仅适用于 Python 3。你也可以提供自己的异步事件循环!

使用 Twisted 的示例用法

from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession

session = AsyncSession(n=100)

@inlineCallbacks
def main(reactor):
    responses = []
    for i in range(100):
        responses.append(session.get('http://httpbin.org/get'))

    for response in responses:
        r = yield response
        print(r)

if __name__ == '__main__':
    react(main)

此示例适用于 Python 2 和 Python 3。

也许它可以帮助我的回购,一个基本的例子, WRITING FAST ASYNC HTTP REQUESTS IN PYTHON

【讨论】:

【参考方案16】:

这是一个 "async" 解决方案,它不使用asyncio,但较低级别的机制asyncio 使用(在Linux 上):select()。 (或者可能asyncio 使用pollepoll,但原理类似。)

它是example from PyCurl 的略微修改版本。

(为简单起见,它多次请求同一个 URL,但您可以轻松修改它以检索一堆不同的 URL。)

(再稍加修改,就可以像无限循环一样一遍又一遍地检索相同的 URL。提示:将while urls and handles 更改为while handles,并将while nprocessed&lt;nurls 更改为while 1。)

import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN)  # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info

NCONNS = 2  # Number of concurrent GET requests
url    = 'example.com'
urls   = [url for i in range(0x7*NCONNS)]  # Copy the same URL over and over

# Check args
nurls  = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37mnurls \x1b[91m@ \x1b[92mNCONNS\x1b[0m')

# Pre-allocate a list of curl objects
m         = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
  c = pycurl.Curl()
  c.setopt(pycurl.FOLLOWLOCATION,  1)
  c.setopt(pycurl.MAXREDIRS,       5)
  c.setopt(pycurl.CONNECTTIMEOUT,  30)
  c.setopt(pycurl.TIMEOUT,         300)
  c.setopt(pycurl.NOSIGNAL,        1)
  m.handles.append(c)

handles    = m.handles  # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:

  while urls and handles:  # If there is an url to process and a free curl object, add to multi stack
    url   = urls.pop(0)
    c     = handles.pop()
    c.buf = io.BytesIO()
    c.url = url  # store some info
    c.t0  = time.perf_counter()
    c.setopt(pycurl.URL,        c.url)
    c.setopt(pycurl.WRITEDATA,  c.buf)
    c.setopt(pycurl.HTTPHEADER, [f'user-agent: random.randint(0,(1<<256)-1):x', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
    m.add_handle(c)

  while 1:  # Run the internal curl state machine for the multi stack
    ret, num_handles = m.perform()
    if ret!=pycurl.E_CALL_MULTI_PERFORM:  break

  while 1:  # Check for curl objects which have terminated, and add them to the handles
    nq, ok_list, ko_list = m.info_read()
    for c in ok_list:
      m.remove_handle(c)
      t1 = time.perf_counter()
      reply = gzip.decompress(c.buf.getvalue())
      print(f'\x1b[33mGET  \x1b[32mt1-c.t0:.3f  \x1b[37mlen(reply):9,  \x1b[0mreply[:32]...')  # \x1b[35mpsutil.Process(os.getpid()).memory_info().rss:, \x1b[0mbytes')
      handles.append(c)
    for c, errno, errmsg in ko_list:
      m.remove_handle(c)
      print('\x1b[31mFAIL c.url errno errmsg')
      handles.append(c)
    nprocessed = nprocessed + len(ok_list) + len(ko_list)
    if nq==0: break

  m.select(1.0)  # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.

for c in m.handles:
  c.close()
m.close()

【讨论】:

为什么asyncioselect 慢? 我不认为我说它更慢,但是,既然你提到它,这毕竟是 Python,所以asyncio 可能与原始相比有开销致电select(这就是为什么我赞成不使用asyncio,而是直接使用select 如果这很容易,或者如果你有时间或者如果你有有趣)。【参考方案17】:

对于您的情况,线程可能会起到作用,因为您可能会花费大部分时间等待响应。标准库中有一些有用的模块,例如 Queue,可能会有所帮助。

我之前做过类似的并行下载文件的事情,这对我来说已经足够了,但它没有达到你所说的规模。

如果您的任务更受 CPU 限制,您可能需要查看 multiprocessing 模块,该模块将允许您利用更多 CPU/内核/线程(更多进程不会相互阻塞,因为锁定是每个进程)

【讨论】:

我唯一想提的是,产生多个进程可能比产生多个线程更昂贵。此外,使用多进程与多线程发送 100,000 个 HTTP 请求并没有明显的性能提升。【参考方案18】:

考虑使用 Windmill ,尽管 Windmill 可能无法执行那么多线程。

您可以在 5 台机器上使用手动 Python 脚本来实现,每台机器使用 40000-60000 端口连接出站,打开 100,000 个端口连接。

此外,使用线程良好的 QA 应用程序(例如 OpenSTA)进行示例测试可能会有所帮助,以便了解每个服务器可以处理多少。

另外,请尝试使用带有 LWP::ConnCache 类的简单 Perl。这样你可能会获得更高的性能(更多的连接)。

【讨论】:

【参考方案19】:

[工具]

Apache Bench 就是你所需要的。 - 用于测量 HTTP Web 服务器性能的命令行计算机程序 (CLI)

一篇不错的博文:https://www.petefreitag.com/item/689.cfm(来自 Pete Freitag

【讨论】:

OP 并不是要测量一台服务器。它是将许多请求同时发送到许多服务器,以收集响应。有点像网络爬虫。【参考方案20】:

最简单的方法是使用 Python 的内置线程库。 它们不是“真正的”/内核线程它们有问题(如序列化),但已经足够好了。您需要一个队列和线程池。一种选择是here,但自己编写很简单。您无法并行处理所有 100,000 个调用,但您可以同时触发 100 个(左右)它们。

【讨论】:

Python 的线程是非常真实的,例如与 Ruby 的相反。在底层,它们被实现为本地操作系统线程,至少在 Unix/Linux 和 Windows 上是这样。也许你指的是 GIL,但它并没有让线程变得不那么真实...... Eli 关于 Python 的线程是正确的,但是 Pestilence 的观点是你想要使用线程池也是正确的。在这种情况下,您最不想做的事情是尝试同时为 100K 请求中的每一个启动一个单独的线程。 伊戈尔,你不能明智地在 cmets 中发布代码 sn-ps,但你可以编辑你的问题并将它们添加到那里。 瘟疫:您会为我的解决方案推荐多少个队列和每个队列的线程数? 另外这是一个 I/O 绑定任务,不受 CPU 限制,GIL 很大程度上影响 CPU 绑定任务

以上是关于在 Python 中发送 100,000 个 HTTP 请求的最快方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中聚类约 100,000 个短字符串

100,000 个主题限制的 AWS SNS 解决方法

PyMySQL 在一个查询中进行不同的更新?

使用 Core Data 高效显示 100,000 个项目

在100,000个核心集群上运行100万个作业

100,000 个并发连接的 WebSocket 服务器是啥?