一个非常简单的多线程并行 URL 获取(无队列)
Posted
技术标签:
【中文标题】一个非常简单的多线程并行 URL 获取(无队列)【英文标题】:A very simple multithreading parallel URL fetching (without queue) 【发布时间】:2013-04-17 08:42:05 【问题描述】:我花了一整天时间寻找最简单的 Python 多线程 URL 提取器,但我发现的大多数脚本都使用队列或多处理或复杂库。
最后我自己写了一个,我将其报告为答案。请随时提出任何改进建议。
我猜其他人可能一直在寻找类似的东西。
【问题讨论】:
只是补充一点:在 Python 的情况下,由于 GIL,多线程不是内核原生的。 它仍然看起来并行获取 URL 比串行获取更快。这是为什么?是因为(我假设)Python 解释器在 HTTP 请求期间没有连续运行吗? 如果我想解析我获取的那些网页的内容呢?是在每个线程中进行解析更好,还是应该在将工作线程加入主线程后按顺序进行? 【参考方案1】:尽可能简化您的原始版本:
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
def fetch_url(url):
urlHandler = urllib2.urlopen(url)
html = urlHandler.read()
print "'%s\' fetched in %ss" % (url, (time.time() - start))
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print "Elapsed Time: %s" % (time.time() - start)
这里唯一的新技巧是:
跟踪您创建的线程。 如果您只想知道线程何时全部完成,请不要费心计算线程数;join
已经告诉你了。
如果您不需要任何状态或外部 API,则不需要 Thread
子类,只需 target
函数。
【讨论】:
我确保声称这是“尽可能”简化的,因为这是确保聪明人出现并找到进一步简化它的最佳方法,只是为了让我看起来愚蠢的。 :) 我相信要打败它并不容易! :-) 自从我在这里发布的第一个版本以来,这是一个很大的改进 也许我们可以将前两个循环合并为一个?通过在同一个for
循环中实例化和启动线程?
@DanieleB:那么,您必须将列表理解更改为围绕append
的显式循环,例如this。或者,也可以编写一个包装器来创建、启动和返回一个线程,例如this。无论哪种方式,我认为它都不那么简单(虽然第二种方法是重构复杂案例的有用方法,但当事情已经很简单时它就不起作用了)。
@DanieleB:但是,使用不同的语言,您可以这样做。如果thread.start()
返回了线程,您可以将创建和开始放在一个表达式中。在 C++ 或 javascript 中,您可能会这样做。问题是,虽然方法链接和其他“流畅的编程”技术使事情更加简洁,但它们也可以打破表达式/语句的边界,并且通常是模棱两可的。所以 Python 的方向几乎完全相反,几乎 no 方法或运算符返回它们操作的对象。见en.wikipedia.org/wiki/Fluent_interface。【参考方案2】:
multiprocessing
有一个不启动其他进程的线程池:
#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
def fetch_url(url):
try:
response = urlopen(url)
return url, response.read(), None
except Exception as e:
return url, None, e
start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
if error is None:
print("%r fetched in %ss" % (url, timer() - start))
else:
print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))
与基于Thread
的解决方案相比的优势:
ThreadPool
允许限制最大并发连接数(代码示例中为20
)
输出没有乱码,因为所有输出都在主线程中
记录错误
代码无需更改即可在 Python 2 和 3 上运行(假设 from urllib.request import urlopen
在 Python 3 上)。
【讨论】:
我有一个关于代码的问题:底部第四行中的打印是否真的返回获取 url 所花费的时间或从“结果”返回 url 所花费的时间目的?据我了解,时间戳应该打印在 fetch_url() 函数中,而不是结果打印部分。 @UweZiegenhagenimap_unordered()
准备好后立即返回结果。我认为与发出 http 请求所需的时间相比,开销可以忽略不计。
谢谢,我用修改后的形式并行编译LaTeX文件:uweziegenhagen.de/?p=3501
这是迄今为止最好、最快和最简单的方法。我一直在尝试使用python 2和python 3的twisted、scrapy等,这个更简单更好
谢谢!有没有办法在通话之间添加延迟?【参考方案3】:
concurrent.futures
中的主要示例可以满足您的所有需求,而且要简单得多。此外,它一次只处理 5 个就可以处理大量的 URL,而且它可以更好地处理错误。
当然,该模块仅内置于 Python 3.2 或更高版本……但如果您使用的是 2.5-3.1,则可以只安装来自 PyPI 的后向端口futures
。您需要对示例代码进行更改,只需将concurrent.futures
替换为futures
,对于2.x,将urllib.request
替换为urllib2
。
这是向后移植到 2.x 的示例,修改为使用您的 URL 列表并添加时间:
import concurrent.futures
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
# Retrieve a single page and report the url and contents
def load_url(url, timeout):
conn = urllib2.urlopen(url, timeout=timeout)
return conn.readall()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = executor.submit(load_url, url, 60): url for url in urls
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print '%r generated an exception: %s' % (url, exc)
else:
print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)
但是你可以让这更简单。真的,您只需要:
def load_url(url):
conn = urllib2.urlopen(url, timeout)
data = conn.readall()
print '"%s" fetched in %ss' % (url,(time.time() - start))
return data
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
pages = executor.map(load_url, urls)
print "Elapsed Time: %ss" % (time.time() - start)
【讨论】:
【参考方案4】:我现在发布了一个不同的解决方案,通过让工作线程不是守护进程并将它们加入主线程(这意味着阻塞主线程直到所有工作线程都完成)而不是通知每个工作线程的执行结束并回调一个全局函数(就像我在上一个答案中所做的那样),正如在某些 cmets 中指出的那样,这种方式不是线程安全的。
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
class FetchUrl(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.url = url
def run(self):
urlHandler = urllib2.urlopen(self.url)
html = urlHandler.read()
print "'%s\' fetched in %ss" % (self.url,(time.time() - start))
for url in urls:
FetchUrl(url).start()
#Join all existing threads to main thread.
for thread in threading.enumerate():
if thread is not threading.currentThread():
thread.join()
print "Elapsed Time: %s" % (time.time() - start)
【讨论】:
这会起作用,但这不是您想要的方式。如果您的程序的更高版本创建任何其他线程(守护程序,或由其他代码加入),它将中断。此外,thread is threading.currentThread()
不能保证工作(我认为它始终适用于迄今为止的任何 CPython 版本,在任何具有真实线程的平台上,如果在主线程中使用......但仍然,最好不要假设)。将所有 Thread
对象存储在列表 (threads = [FetchUrl(url) for url in urls]
) 中更安全,然后启动它们,然后使用 for thread in threads: thread.join()
加入它们。
另外,对于像这样的简单情况,您可以进一步简化它:不要费心创建Thread
子类,除非您有某种状态要存储或某些 API 与线程交互从外部,只需编写一个简单的函数,然后执行threading.Thread(target=my_thread_function, args=[url])
。
你的意思是说,如果我有相同的脚本在同一台机器上同时运行两次,'for thread in threading.enumerate():' 将包括两个执行的线程?
请参阅pastebin.com/Z5MdeB5x,我认为这与您将获得的显式线程 URL-fetcher 一样简单。
threading.enumerate()
仅包括当前进程中的线程,因此在作为单独进程运行的 Python 的不同实例中运行同一脚本的多个副本不是问题。只是如果您以后决定扩展此代码(或在其他项目中使用它),您可能在代码的另一部分创建了守护线程,或者现在的主代码甚至可能是在某个后台线程中运行的代码。 【参考方案5】:
此脚本从数组中定义的一组 URL 中获取内容。它为每个要获取的 URL 生成一个线程,因此它旨在用于有限的 URL 集。
每个线程不使用队列对象,而是通过对全局函数的回调来通知其结束,该函数会记录正在运行的线程数。
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)
class FetchUrl(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.setDaemon = True
self.url = url
def run(self):
urlHandler = urllib2.urlopen(self.url)
html = urlHandler.read()
finished_fetch_url(self.url)
def finished_fetch_url(url):
"callback function called when a FetchUrl thread ends"
print "\"%s\" fetched in %ss" % (url,(time.time() - start))
global left_to_fetch
left_to_fetch-=1
if left_to_fetch==0:
"all urls have been fetched"
print "Elapsed Time: %ss" % (time.time() - start)
for url in urls:
"spawning a FetchUrl thread for each url to fetch"
FetchUrl(url).start()
【讨论】:
我可以看到这非常有用!谢谢:) 在没有锁的情况下修改共享全局变量不是线程安全的。做urlsToFetch-=1
之类的事情尤其很危险。在解释器内部,编译成三个独立的步骤来加载urlsToFetch
、减一和存储urlsToFetch
。如果解释器在加载和存储之间切换线程,你最终会导致线程 1 加载 2,然后线程 2 加载相同的 2,然后线程 2 存储 1,然后线程 1 存储 1。
嗨 abarnert,感谢您的回答,您能建议一个线程安全的解决方案吗?非常感谢
您可以在对变量的每个访问或许多其他可能性周围放置一个threading.Lock
(使用计数信号量而不是普通整数,或使用屏障而不是显式计数,...),但你真的根本不需要这个全球性的。只需join
所有线程,而不是对它们进行守护,当您将它们全部加入时就完成了。
事实上......像这样守护线程然后不等待任何东西意味着您的程序退出,终止所有工作线程,在大多数工作线程完成之前。在速度较快但网络连接速度较慢的 MacBook Pro 上,我经常在退出之前没有完成任何。以上是关于一个非常简单的多线程并行 URL 获取(无队列)的主要内容,如果未能解决你的问题,请参考以下文章