如何在 ThreadPool 中延迟执行?

Posted

技术标签:

【中文标题】如何在 ThreadPool 中延迟执行?【英文标题】:How to delay execution within ThreadPool? 【发布时间】:2019-03-08 10:56:56 【问题描述】:

我使用multiprocessing.pool.ThreadPool 在python 中编写了一个脚本,以同时处理多个请求并执行稳健的抓取过程。解析器完美地完成了它的工作。

正如我在几个脚本中注意到的那样,当使用 multiprocessing 创建抓取过程时应该会有延迟,我想在我的下面的脚本也是如此。

但是,这就是我卡住的地方,找不到合适的位置来放置延迟。

到目前为止,这是我的脚本:

import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from multiprocessing.pool import ThreadPool

url = "http://srar.com/roster/index.php?agent_search=a"

def get_links(link):
    completelinks = []
    res = requests.get(link)  
    soup = BeautifulSoup(res.text,'lxml')
    for items in soup.select("table.border tr"):
        if not items.select("td a[href^='index.php?agent']"):continue
        data = [urljoin(link,item.get("href")) for item in items.select("td a[href^='index.php?agent']")]
        completelinks.extend(data)
    return completelinks

def get_info(nlink):
    req = requests.get(nlink)
    sauce = BeautifulSoup(req.text,"lxml")
    for tr in sauce.select("table[style$='1px;'] tr")[1:]:
        table = [td.get_text(strip=True) for td in tr.select("td")]
        print(table)

if __name__ == '__main__':
    ThreadPool(20).map(get_info, get_links(url))

再说一次:我只需要知道脚本中的正确位置即可延迟。

【问题讨论】:

在您返回之前return completelinks 仍有两种方法可以延迟。你的意思是@stovfl?请查看 this image 进行澄清。非常感谢。 我们喜欢图片! Edit 你的问题。 Edit您的问题并详细说明两种方式的procon?阅读Section: Lines and Indentation。 别误会@stovfl。我知道这里不接受图片。但是,我上传了该图片以确定您的意思。我提出这个问题(根据您的评论)只是因为您建议的延迟位置并不那么具体,因为我可以通过两种方式使用它。顺便说一下,我对缩进有点了解。 你对这两种方式的procon得出了哪个结论? 【参考方案1】:

要在位于get_info 内的多个requests.get() 调用之间设置延迟,您必须使用延迟参数扩展get_info,它可以将其作为time.sleep() 调用的输入。由于您的所有工作线程同时启动,因此每次调用的延迟都必须是累积的。这意味着,当您希望 requests.get() 调用之间的延迟为 0.5 秒时,您传递给池方法的延迟列表将类似于 [0.0, 0.5, 1.0, 1.5, 2.0, 2.5 ...] .

为了不必改变 get_info 本身,我在下面的示例中使用装饰器来扩展 get_info 并带有延迟参数和 time.sleep(delay) 调用。请注意,我在 pool.starmap 调用中传递了 get_info 的另一个参数的延迟。

import logging
from multiprocessing.pool import ThreadPool
from functools import wraps

def delayed(func):
    @wraps(func)
    def wrapper(delay, *args, **kwargs):
        time.sleep(delay)  # <--
        return func(*args, **kwargs)
    return wrapper

@delayed
def get_info(nlink):
    info = nlink + '_info'
    logger.info(msg=info)
    return info


def get_links(n):
    return [f'linki' for i in range(n)]


def init_logging(level=logging.DEBUG):
    fmt = '[%(asctime)s %(levelname)-8s %(threadName)s' \
          ' %(funcName)s()] --- %(message)s'
    logging.basicConfig(format=fmt, level=level)


if __name__ == '__main__':

    DELAY = 0.5

    init_logging()
    logger = logging.getLogger(__name__)

    links = get_links(10) # ['link0', 'link1', 'link2', ...]
    delays = (x * DELAY for x in range(0, len(links)))
    arguments = zip(delays, links) # (0.0, 'link0'), (0.5, 'link1'), ...

    with ThreadPool(10) as pool:
        result = pool.starmap(get_info, arguments)
        print(result)

示例输出:

[2018-10-03 22:04:14,221 INFO     Thread-8 get_info()] --- link0_info
[2018-10-03 22:04:14,721 INFO     Thread-5 get_info()] --- link1_info
[2018-10-03 22:04:15,221 INFO     Thread-3 get_info()] --- link2_info
[2018-10-03 22:04:15,722 INFO     Thread-4 get_info()] --- link3_info
[2018-10-03 22:04:16,223 INFO     Thread-1 get_info()] --- link4_info
[2018-10-03 22:04:16,723 INFO     Thread-6 get_info()] --- link5_info
[2018-10-03 22:04:17,224 INFO     Thread-7 get_info()] --- link6_info
[2018-10-03 22:04:17,723 INFO     Thread-10 get_info()] --- link7_info
[2018-10-03 22:04:18,225 INFO     Thread-9 get_info()] --- link8_info
[2018-10-03 22:04:18,722 INFO     Thread-2 get_info()] --- link9_info
['link0_info', 'link1_info', 'link2_info', 'link3_info', 'link4_info', 
'link5_info', 'link6_info', 'link7_info', 'link8_info', 'link9_info']

【讨论】:

迟到才注意到您的回答。非常感谢。

以上是关于如何在 ThreadPool 中延迟执行?的主要内容,如果未能解决你的问题,请参考以下文章

如何在java中实现延迟几秒钟

如何延迟执行 Segue

如何在执行一组工作人员之间正确延迟

如何在 Flutter 中延迟 SnackBar

如何延迟android的某段代码执行时间

如何延迟执行 Web 测试的第一个请求?