一个非常简单的多线程并行 URL 获取(无队列)

Posted

技术标签:

【中文标题】一个非常简单的多线程并行 URL 获取(无队列)【英文标题】:A very simple multithreading parallel URL fetching (without queue) 【发布时间】:2013-04-17 08:42:05 【问题描述】:

我花了一整天时间寻找最简单的 Python 多线程 URL 提取器,但我发现的大多数脚本都使用队列或多处理或复杂库。

最后我自己写了一个,我将其报告为答案。请随时提出任何改进建议。

我猜其他人可能一直在寻找类似的东西。

【问题讨论】:

只是补充一点:在 Python 的情况下,由于 GIL,多线程不是内核原生的。 它仍然看起来并行获取 URL 比串行获取更快。这是为什么?是因为(我假设)Python 解释器在 HTTP 请求期间没有连续运行吗? 如果我想解析我获取的那些网页的内容呢?是在每个线程中进行解析更好,还是应该在将工作线程加入主线程后按顺序进行? 【参考方案1】:

尽可能简化您的原始版本:

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    urlHandler = urllib2.urlopen(url)
    html = urlHandler.read()
    print "'%s\' fetched in %ss" % (url, (time.time() - start))

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print "Elapsed Time: %s" % (time.time() - start)

这里唯一的新技巧是:

跟踪您创建的线程。 如果您只想知道线程何时全部完成,请不要费心计算线程数; join 已经告诉你了。 如果您不需要任何状态或外部 API,则不需要 Thread 子类,只需 target 函数。

【讨论】:

我确保声称这是“尽可能”简化的,因为这是确保聪明人出现并找到进一步简化它的最佳方法,只是为了让我看起来愚蠢的。 :) 我相信要打败它并不容易! :-) 自从我在这里发布的第一个版本以来,这是一个很大的改进 也许我们可以将前两个循环合并为一个?通过在同一个 for 循环中实例化和启动线程? @DanieleB:那么,您必须将列表理解更改为围绕append 的显式循环,例如this。或者,也可以编写一个包装器来创建、启动和返回一个线程,例如this。无论哪种方式,我认为它都不那么简单(虽然第二种方法是重构复杂案例的有用方法,但当事情已经很简单时它就不起作用了)。 @DanieleB:但是,使用不同的语言,您可以这样做。如果thread.start() 返回了线程,您可以将创建和开始放在一个表达式中。在 C++ 或 javascript 中,您可能会这样做。问题是,虽然方法链接和其他“流畅的编程”技术使事情更加简洁,但它们也可以打破表达式/语句的边界,并且通常是模棱两可的。所以 Python 的方向几乎完全相反,几乎 no 方法或运算符返回它们操作的对象。见en.wikipedia.org/wiki/Fluent_interface。【参考方案2】:

multiprocessing 有一个不启动其他进程的线程池:

#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen

urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    try:
        response = urlopen(url)
        return url, response.read(), None
    except Exception as e:
        return url, None, e

start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
    if error is None:
        print("%r fetched in %ss" % (url, timer() - start))
    else:
        print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))

与基于Thread的解决方案相比的优势:

ThreadPool 允许限制最大并发连接数(代码示例中为20) 输出没有乱码,因为所有输出都在主线程中 记录错误 代码无需更改即可在 Python 2 和 3 上运行(假设 from urllib.request import urlopen 在 Python 3 上)。

【讨论】:

我有一个关于代码的问题:底部第四行中的打印是否真的返回获取 url 所花费的时间或从“结果”返回 url 所花费的时间目的?据我了解,时间戳应该打印在 fetch_url() 函数中,而不是结果打印部分。 @UweZiegenhagen imap_unordered() 准备好后立即返回结果。我认为与发出 http 请求所需的时间相比,开销可以忽略不计。 谢谢,我用修改后的形式并行编译LaTeX文件:uweziegenhagen.de/?p=3501 这是迄今为止最好、最快和最简单的方法。我一直在尝试使用python 2和python 3的twisted、scrapy等,这个更简单更好 谢谢!有没有办法在通话之间添加延迟?【参考方案3】:

concurrent.futures 中的主要示例可以满足您的所有需求,而且要简单得多。此外,它一次只处理 5 个就可以处理大量的 URL,而且它可以更好地处理错误。

当然,该模块仅内置于 Python 3.2 或更高版本……但如果您使用的是 2.5-3.1,则可以只安装来自 PyPI 的后向端口futures。您需要对示例代码进行更改,只需将concurrent.futures 替换为futures,对于2.x,将urllib.request 替换为urllib2

这是向后移植到 2.x 的示例,修改为使用您的 URL 列表并添加时间:

import concurrent.futures
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib2.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = executor.submit(load_url, url, 60): url for url in urls
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print '%r generated an exception: %s' % (url, exc)
        else:
            print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)

但是你可以让这更简单。真的,您只需要:

def load_url(url):
    conn = urllib2.urlopen(url, timeout)
    data = conn.readall()
    print '"%s" fetched in %ss' % (url,(time.time() - start))
    return data
    
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    pages = executor.map(load_url, urls)

print "Elapsed Time: %ss" % (time.time() - start)

【讨论】:

【参考方案4】:

我现在发布了一个不同的解决方案,通过让工作线程不是守护进程并将它们加入主线程(这意味着阻塞主线程直到所有工作线程都完成)而不是通知每个工作线程的执行结束并回调一个全局函数(就像我在上一个答案中所做的那样),正如在某些 cmets 中指出的那样,这种方式不是线程安全的。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        print "'%s\' fetched in %ss" % (self.url,(time.time() - start))

for url in urls:
    FetchUrl(url).start()

#Join all existing threads to main thread.
for thread in threading.enumerate():
    if thread is not threading.currentThread():
        thread.join()

print "Elapsed Time: %s" % (time.time() - start)

【讨论】:

这会起作用,但这不是您想要的方式。如果您的程序的更高版本创建任何其他线程(守护程序,或由其他代码加入),它将中断。此外,thread is threading.currentThread() 不能保证工作(我认为它始终适用于迄今为止的任何 CPython 版本,在任何具有真实线程的平台上,如果在主线程中使用......但仍然,最好不要假设)。将所有 Thread 对象存储在列表 (threads = [FetchUrl(url) for url in urls]) 中更安全,然后启动它们,然后使用 for thread in threads: thread.join() 加入它们。 另外,对于像这样的简单情况,您可以进一步简化它:不要费心创建Thread 子类,除非您有某种状态要存储或某些 API 与线程交互从外部,只需编写一个简单的函数,然后执行threading.Thread(target=my_thread_function, args=[url]) 你的意思是说,如果我有相同的脚本在同一台机器上同时运行两次,'for thread in threading.enumerate():' 将包括两个执行的线程? 请参阅pastebin.com/Z5MdeB5x,我认为这与您将获得的显式线程 URL-fetcher 一样简单。 threading.enumerate() 仅包括当前进程中的线程,因此在作为单独进程运行的 Python 的不同实例中运行同一脚本的多个副本不是问题。只是如果您以后决定扩展此代码(或在其他项目中使用它),您可能在代码的另一部分创建了守护线程,或者现在的主代码甚至可能是在某个后台线程中运行的代码。 【参考方案5】:

此脚本从数组中定义的一组 URL 中获取内容。它为每个要获取的 URL 生成一个线程,因此它旨在用于有限的 URL 集。

每个线程不使用队列对象,而是通过对全局函数的回调来通知其结束,该函数会记录正在运行的线程数。

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.setDaemon = True
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        finished_fetch_url(self.url)


def finished_fetch_url(url):
    "callback function called when a FetchUrl thread ends"
    print "\"%s\" fetched in %ss" % (url,(time.time() - start))
    global left_to_fetch
    left_to_fetch-=1
    if left_to_fetch==0:
        "all urls have been fetched"
        print "Elapsed Time: %ss" % (time.time() - start)


for url in urls:
    "spawning a FetchUrl thread for each url to fetch"
    FetchUrl(url).start()

【讨论】:

我可以看到这非常有用!谢谢:) 在没有锁的情况下修改共享全局变量不是线程安全的。做urlsToFetch-=1 之类的事情尤其很危险。在解释器内部,编译成三个独立的步骤来加载urlsToFetch、减一和存储urlsToFetch。如果解释器在加载和存储之间切换线程,你最终会导致线程 1 加载 2,然后线程 2 加载相同的 2,然后线程 2 存储 1,然后线程 1 存储 1。 嗨 abarnert,感谢您的回答,您能建议一个线程安全的解决方案吗?非常感谢 您可以在对变量的每个访问或许多其他可能性周围放置一个threading.Lock(使用计数信号量而不是普通整数,或使用屏障而不是显式计数,...),但你真的根本不需要这个全球性的。只需join 所有线程,而不是对它们进行守护,当您将它们全部加入时就完成了。 事实上......像这样守护线程然后不等待任何东西意味着您的程序退出,终止所有工作线程,在大多数工作线程完成之前。在速度较快但网络连接速度较慢的 MacBook Pro 上,我经常在退出之前没有完成任何

以上是关于一个非常简单的多线程并行 URL 获取(无队列)的主要内容,如果未能解决你的问题,请参考以下文章

kafka浅谈Kafka的多线程消费的设计

为啥我的多线程并行求和函数的向量受限于线程数?

8简单的多线程爬取网页数据 并通过xpath解析存到本地

[转]TestNG的多线程并行

处理 SQS 项目队列的多线程方法

Python中的多线程并行运行