并行(代理)请求并获得最快的结果

Posted

技术标签:

【中文标题】并行(代理)请求并获得最快的结果【英文标题】:Parallel (proxy) request and take the fastest result 【发布时间】:2021-05-08 07:54:07 【问题描述】:

我正在尝试通过外部代理(旋转器)优化请求。有时反应很快,有时很慢。所以想法是对同一个url请求并行发送多个请求,取最快的响应,返回数据,关闭函数而不等待其他较慢的响应。

网上有很多关于python并行请求的教程和SO问题,但都是针对不同请求的并行请求,而不是重复请求。此外,代码会一直等待,直到所有请求都完成。一旦最快的响应得到响应,我想终止并行请求逻辑(最好以干净的方式)。

我的应用程序在 Python Flask 中运行,并使用 Gunicorn + Eventlet 运行。我尝试了 Eventlet green pools 和 Python Concurrent Futures,但使用 Eventlet Greenpool 似乎更合适,因为代码将在 Gunicorn + Eventlet workers 和 Celery with Eventlet workers 中运行。

我目前正在使用 Luminati 代理管理器 (LPM) 重试失败的请求。旧版本似乎支持盒子中的并行请求,但当前版本不再支持此功能。所以我要么尝试在我的 Python 应用程序中使用代码来解决它,要么添加另一个服务/工具(如 LPM)来处理并行请求并选择最快的请求。

代理服务 Luminati.io 提供了一个“高性能并行请求”代码示例(基于 Eventlet Greenpool)。请参阅“原始示例”

我在没有代理和登录的情况下编辑了代码,以使其更具可重复性并避免不可预测的代理响应时间。我没有得到 Luminati 的任何支持,所以我正在尝试解决这个问题。 对于这个测试,我使用了模拟的 5 秒慢响应,以及来自 httpstat.us 的快速响应:

['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']

在编辑后的代码中,我添加了带有计时的打印语句,以查看哪个响应首先返回。 这段代码有两个问题。有时我可以看到快速响应首先返回并打印响应数据('OK'),然后在 5 秒后缓慢响应。但是,代码通常会等到两个响应都返回(两个时间完全相同)。

另一个问题是,虽然我能够打印并立即查看“快速”响应的数据,但逻辑仍会等待所有响应完成。一旦第一个响应返回,我想返回数据并关闭函数。在我编辑的代码中,您可以看到一些代码(注释掉的行)是我试图不成功地终止进程(但这只是重新启动 eventlet 进程)。

原始示例

import eventlet
from eventlet.green.urllib import request
import random
import socket

super_proxy = socket.gethostbyname('zproxy.lum-superproxy.io')

class SingleSessionRetriever:

    url = "http://%s-session-%s:%s@"+super_proxy+":%d"
    port = 22225

    def __init__(self, username, password, requests_limit, failures_limit):
        self._username = username
        self._password = password
        self._requests_limit = requests_limit
        self._failures_limit = failures_limit
        self._reset_session()

    def _reset_session(self):
        session_id = random.random()
        proxy = SingleSessionRetriever.url % (self._username, session_id, self._password,
                                              SingleSessionRetriever.port)
        proxy_handler = request.ProxyHandler('http': proxy, 'https': proxy)
        self._opener = request.build_opener(proxy_handler)
        self._requests = 0
        self._failures = 0

    def retrieve(self, url, timeout):
        while True:
            if self._requests == self._requests_limit:
                self._reset_session()
            self._requests += 1
            try:
                timer = eventlet.Timeout(timeout)
                result = self._opener.open(url).read()
                timer.cancel()
                return result
            except:
                timer.cancel()
                self._failures += 1
                if self._failures == self._failures_limit:
                    self._reset_session()


class MultiSessionRetriever:

    def __init__(self, username, password, session_requests_limit, session_failures_limit):
        self._username = username
        self._password = password
        self._sessions_stack = []
        self._session_requests_limit = session_requests_limit
        self._session_failures_limit = session_failures_limit

    def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
        pool = eventlet.GreenPool(parallel_sessions_limit)
        for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):
            callback(url, body)

    def _retrieve_single(self, url, timeout):
        if self._sessions_stack:
            session = self._sessions_stack.pop()
        else:
            session = SingleSessionRetriever(self._username, self._password,
                                             self._session_requests_limit, self._session_failures_limit)
        body = session.retrieve(url, timeout)
        self._sessions_stack.append(session)
        return url, body

def output(url, body):
    print(body)

n_total_req = 100
req_timeout = 10
n_parallel_exit_nodes = 10
switch_ip_every_n_req = 10
max_failures = 2

MultiSessionRetriever('lum-customer-c_ba028d72-zone-static', 'akssw3iy6h3y', switch_ip_every_n_req, max_failures).retrieve(
    ["http://lumtest.com/myip.json"] * n_total_req, req_timeout, n_parallel_exit_nodes, output)

编辑的代码(没有登录和代理)

def high_perf_parallel_requests(search_url):

    try:
        import datetime
        from eventlet.green.urllib import request

        results2 = []
        results1 = []

        class SingleSessionRetriever:


            def __init__(self, username, password, requests_limit, failures_limit):
                self._username = username
                self._password = password
                self._requests_limit = requests_limit
                self._failures_limit = failures_limit
                self._reset_session()

            def _reset_session(self):
                
                self._requests = 0
                self._failures = 0

            def retrieve(self, url, timeout):

                print("\n SingleSessionRetriever.retrieve init")
                print(url)
                print(datetime.datetime.now())

                while True:

                    if self._requests == self._requests_limit:
                        self._reset_session()
                    self._requests += 1
                    try:
                        timer = eventlet.Timeout(timeout)

                        result = request.urlopen(url).read()
                        print("\n SingleSessionRetriever.retrieve result")
                        print(url)
                        print(result)
                        print(datetime.datetime.now())

                        results1.append(result)

                        timer.cancel()
                        # eventlet.kill(pool)
                        # raise Exception("Got fastest result. Kill eventlet")
                        #eventlet.kill(self)
                        #pool.kill()
                        return result

                    except:
                        timer.cancel()
                        self._failures += 1
                        if self._failures == self._failures_limit:
                            self._reset_session()


        class MultiSessionRetriever:
        

            def __init__(self, username, password, session_requests_limit, session_failures_limit):
                self._returned = False
                self._username = username
                self._password = password
                self._sessions_stack = []
                self._session_requests_limit = session_requests_limit
                self._session_failures_limit = session_failures_limit

            def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
                pool = eventlet.GreenPool(parallel_sessions_limit)
                try:
                    # for url in urls:
                    #     print("spawn ".format(url))
                    #     pool.spawn_n(self._retrieve_single(url, timeout))
                    #pool.waitall()
                    for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):


                        if body:
                            print("\n MultiSessionRetriever.retrieve: Body received")
                            print(datetime.datetime.now())
                            # eventlet.Event.send_exception
                            #return body
                            #eventlet.kill(self)
                            # pool.kill()
                    
                        print("\n MultiSessionRetriever.retrieve: in for loop")
                        print(url)
                        print(body)
                        print(datetime.datetime.now())
                        callback(url, body)

                except Exception as e:
                    # eventlet.kill(pool)
                    # eventlet.kill(self)
                    print(e)

                print("\n MultiSessionRetriever.retrieve: after loop")
                print(datetime.datetime.now())
                # eventlet.kill(self)


            def _retrieve_single(self, url, timeout):
                print("\n MultiSessionRetriever._retrieve_single url:")
                print(url)
                print(datetime.datetime.now())
                if self._sessions_stack:
                    session = self._sessions_stack.pop()
                else:
                    session = SingleSessionRetriever(self._username, self._password,
                                                    self._session_requests_limit, self._session_failures_limit)
                body = session.retrieve(url, timeout)
                print("\n MultiSessionRetriever._retrieve_single body:")
                print(body)
                print(datetime.datetime.now())
                self._sessions_stack.append(session)
                return url, body


        def output(url, body):
            print("\n MultiSessionRetriever.output:")
            print(url)
            print(body)
            print(datetime.datetime.now())
            results2.append(body)


        # n_total_req = 2
        req_timeout = 10
        n_parallel_exit_nodes = 2
        switch_ip_every_n_req = 1
        max_failures = 2

        urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']

        print("start")
        print(datetime.datetime.now())

        x = MultiSessionRetriever('', '', switch_ip_every_n_req, max_failures).retrieve(
            urls, req_timeout, n_parallel_exit_nodes, output)

        print("result1:")
        print(results1)
        
        print("result2:")
        print(results2)

        return results2

控制台输出(我使用了另外两个响应快速和慢速的 url 作为响应文本)。

web_1          | high_perf_parallel_requests: start
web_1          | start
web_1          | 2021-02-04 02:28:17.503574
web_1          | 
web_1          |  MultiSessionRetriever._retrieve_single url:
web_1          | http://httpstat.us/200?sleep=5000
web_1          | 2021-02-04 02:28:17.503903
web_1          | 
web_1          |  SingleSessionRetriever.retrieve init
web_1          | http://httpstat.us/200?sleep=5000
web_1          | 2021-02-04 02:28:17.503948
web_1          | 
web_1          |  MultiSessionRetriever._retrieve_single url:
web_1          | http://httpstat.us/200
web_1          | 2021-02-04 02:28:17.511720
web_1          | 
web_1          |  SingleSessionRetriever.retrieve init
web_1          | http://httpstat.us/200
web_1          | 2021-02-04 02:28:17.511783
web_1          | 
web_1          |  SingleSessionRetriever.retrieve result
web_1          | http://httpstat.us/200
web_1          | b'"fast response result"\n'
web_1          | 2021-02-04 02:28:18.269042
web_1          | 
web_1          |  MultiSessionRetriever._retrieve_single body:
web_1          | b'"fast response result"\n'
web_1          | 2021-02-04 02:28:18.269220
web_1          | 
web_1          |  SingleSessionRetriever.retrieve result
web_1          | http://httpstat.us/200?sleep=5000
web_1          | b'"slow response result"\n'
web_1          | 2021-02-04 02:28:24.458372
web_1          | 
web_1          |  MultiSessionRetriever._retrieve_single body:
web_1          | b'"slow response result"\n'
web_1          | 2021-02-04 02:28:24.458499
web_1          | 
web_1          |  MultiSessionRetriever.retrieve: Body received
web_1          | 2021-02-04 02:28:24.458814
web_1          | 
web_1          |  MultiSessionRetriever.retrieve: in for loop
web_1          | http://httpstat.us/200?sleep=5000
web_1          | b'"slow response result"\n'
web_1          | 2021-02-04 02:28:24.458857
web_1          | 
web_1          |  MultiSessionRetriever.output:
web_1          | http://httpstat.us/200?sleep=5000
web_1          | b'"slow response result"\n'
web_1          | 2021-02-04 02:28:24.458918
web_1          | 
web_1          |  MultiSessionRetriever.retrieve: Body received
web_1          | 2021-02-04 02:28:24.459057
web_1          | 
web_1          |  MultiSessionRetriever.retrieve: in for loop
web_1          | http://httpstat.us/200
web_1          | b'"fast response result"\n'
web_1          | 2021-02-04 02:28:24.459158
web_1          | 
web_1          |  MultiSessionRetriever.output:
web_1          | http://httpstat.us/200
web_1          | b'"fast response result"\n'
web_1          | 2021-02-04 02:28:24.459206
web_1          | 
web_1          |  MultiSessionRetriever.retrieve: after loop
web_1          | 2021-02-04 02:28:24.459482
web_1          | result1
web_1          | [b'"fast response result"\n', b'"slow response result"\n']
web_1          | result2
web_1          | [b'"slow response result"\n', b'"fast response result"\n']
web_1          | Parallel resp = [b'"slow response result"\n', b'"fast response result"\n']

Eventlet 和 Concurrent Futures 的其他尝试


def parallel_request(url):

    fastest_result = None

    try:
        import datetime
        import eventlet
        from eventlet.green.urllib.request import urlopen

        # urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
        #     "https://www.python.org/static/img/python-logo.png",
        #     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

        urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']

        def fetch(url):
            print("\n Fetch start")
            print(url)
            print(datetime.datetime.now())
            result = urlopen(url).read()
            print("\n Fetch result")
            print(result)
            print(datetime.datetime.now())

            return result

        pool = eventlet.GreenPool()
        print("\n Parallel start")
        print(datetime.datetime.now())
        for body in pool.imap(fetch, urls):
            print("\n Pool result")
            print(body)
            print(datetime.datetime.now())

        print("\n Parallel end")
        print(datetime.datetime.now())
    
    except Exception as e:
            print(e)

    print("Fastest result= ".format(fastest_result))


期货

def request_futures(url):

    try:
        import datetime
        import concurrent.futures
        import urllib.request

        urls = ['http://httpstat.us/200?sleep=5000','http://httpstat.us/200']

        print("\n Start Futures")
        print(datetime.datetime.now())

        # Retrieve a single page and report the URL and contents
        def load_url(url, timeout):
            with urllib.request.urlopen(url, timeout=timeout) as conn:
                print("\n load url")
                print(datetime.datetime.now())
                result = conn.read()
                print(result)
                print(datetime.datetime.now())

                return result

        # We can use a with statement to ensure threads are cleaned up promptly
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Start the load operations and mark each future with its URL
            future_to_url = executor.submit(load_url, url, 60): url for url in urls
            for future in concurrent.futures.as_completed(future_to_url):
                print("\n Iterate future")  
                print(datetime.datetime.now())

                url = future_to_url[future]
                try:
                    print("\n Try future")
                    print(url)
                    print(datetime.datetime.now())
                    data = future.result()
                    print("\n Data future")
                    print(data)
                    print(datetime.datetime.now())
                    
                except Exception as exc:
                    print('%r generated an exception: %s' % (url, exc))
                else:
                    print('%r page is %d bytes' % (url, len(data)))

        print("\n End Futures")
        print(datetime.datetime.now())

    except Exception as e:
            print(e)

【问题讨论】:

【参考方案1】:

我把事情复杂化了,发现最简单的方法是通过 Celery 后台工作程序(我已经在使用)中的多个任务发送并行 url 请求。 Celery 后台 worker 使用 Eventlet 和多个 worker 来处理大量并发任务(尤其是有大量 I/O 等待时间)

使用下面的代码,我使用相同的 URL 调用了 Celery 任务两次。如果其中一个请求已准备好,则每 x 毫秒检查一次。如果是这样,获取第一个完成的请求并取消另一个 Celery 任务。使用 Eventlet 进行此设置的唯一限制是 Celery 不支持在使​​用 Eventlet 运行时完全终止任务。将来,我可能想通过在 Redis 中使用一个键来让两个并行任务检查另一个是否完成来改进这一点。如果是这样,则可以取消剩余的任务。

from datetime import date time
from app.blueprints.api.v1.tasks import parallel_request

t_start =datetime.now()

# Request two requests in parallel using Celery background tasks 
job1 = parallel_request.apply_async(args=[search_url])

job2 = parallel_request.apply_async(args=[search_url])

        
ready = False
while not ready:
    if job1.ready():
        ready = True    
        print("Parallel job 1 finished first")
        job = job1
        job_cancel= job2
        proxy = proxy0
        break
    if job2.ready():
        ready = True    
        print("Parallel job 2 finished first")
        proxy = proxy4
        job = job2
        job_cancel = job1
        break
    # Check 
    sleep(0.1)

t_end = datetime.now()
proxy_time = int((t_end - t_start).total_seconds() * 1000)

print("Result in  ms".format(proxy_time))
data = job.get()

# Remove other parallel request in celery. #Terminate/Sigkill does not work using Eventlet
revoke(job_cancel.id)

【讨论】:

以上是关于并行(代理)请求并获得最快的结果的主要内容,如果未能解决你的问题,请参考以下文章

如何使用带 Race 的 PromiseKit 来返回最快的结果

如何从 Django 发送异步 HTTP 请求并在 python2.7 中等待结果?

vue项目配置代理服务器中转请求

搭建squid服务器,反向代理

java filter 如何中获取请求地址?

vue 第一次axios请求得到一个数组,然后根据循环数组获得id进行第二次axios请求,请问如何处理速度最快?