在 Celery 任务中运行 Scrapy 蜘蛛
Posted
技术标签:
【中文标题】在 Celery 任务中运行 Scrapy 蜘蛛【英文标题】:Running Scrapy spiders in a Celery task 【发布时间】:2012-07-16 17:49:37 【问题描述】:我有一个 Django 站点,当用户请求它时会发生抓取,我的代码会在新进程中启动一个 Scrapy spider 独立脚本。当然,这不适用于用户的增加。
类似这样的:
class StandAloneSpider(Spider):
#a regular spider
settings.overrides['LOG_ENABLED'] = True
#more settings can be changed...
crawler = CrawlerProcess( settings )
crawler.install()
crawler.configure()
spider = StandAloneSpider()
crawler.crawl( spider )
crawler.start()
我决定使用 Celery 并使用 worker 来排队抓取请求。
但是,我遇到了 Tornado 反应堆无法重新启动的问题。第一个和第二个蜘蛛运行成功,但是后续的蜘蛛会抛出 ReactorNotRestartable 错误。
任何人都可以分享在 Celery 框架中运行 Spider 的任何技巧吗?
【问题讨论】:
【参考方案1】:好的,这就是我如何让 Scrapy 与我的 Django 项目一起工作,该项目使用 Celery 对要抓取的内容进行排队。实际的解决方法主要来自 joehillen 的代码,位于 http://snippets.scrapy.org/snippets/13/
首先是tasks.py
文件
from celery import task
@task()
def crawl_domain(domain_pk):
from crawl import domain_crawl
return domain_crawl(domain_pk)
然后是crawl.py
文件
from multiprocessing import Process
from scrapy.crawler import CrawlerProcess
from scrapy.conf import settings
from spider import DomainSpider
from models import Domain
class DomainCrawlerScript():
def __init__(self):
self.crawler = CrawlerProcess(settings)
self.crawler.install()
self.crawler.configure()
def _crawl(self, domain_pk):
domain = Domain.objects.get(
pk = domain_pk,
)
urls = []
for page in domain.pages.all():
urls.append(page.url())
self.crawler.crawl(DomainSpider(urls))
self.crawler.start()
self.crawler.stop()
def crawl(self, domain_pk):
p = Process(target=self._crawl, args=[domain_pk])
p.start()
p.join()
crawler = DomainCrawlerScript()
def domain_crawl(domain_pk):
crawler.crawl(domain_pk)
这里的技巧是“从多处理导入过程”,它解决了 Twisted 框架中的“ReactorNotRestartable”问题。所以基本上 Celery 任务调用“domain_crawl”函数,它一遍又一遍地重用“DomainCrawlerScript”对象来与你的 Scrapy 蜘蛛交互。 (我知道我的示例有点多余,但我在设置多个版本的 python 时这样做是有原因的 [我的 django 网络服务器实际上使用的是 python2.4,而我的工作服务器使用的是 python2.7])
在我的示例中,“DomainSpider”只是一个经过修改的 Scrapy Spider,它接受一个 url 列表,然后将它们设置为“start_urls”。
希望这会有所帮助!
【讨论】:
你使用 Postgresql 吗?我从 celery 收到了最奇怪的“InterfaceError: connection already closed”。 是的,我正在使用 postgresql,如果您想提供错误,我可以尝试帮助您弄清楚,我确实有一个听起来相似的错误,但我不记得它到底是什么或什么我做到了。 (我目前正在将 11,077,910 个项目加载到我的队列中,并且我有 5 台工作机器从中拉出,所以这个设置确实有效) 我发现了问题,这与将结果存储在 postgres 中有关。我认为是以下票:github.com/celery/django-celery/issues/121 我正在使用的解决方法是设置 CELERY_RESULT_BACKEND。你遇到过吗? 是的,你可能是对的,最新版本的 celery 太棒了,所以当我重写这个项目以使用最新版本时,我也会告诉你如何解决这个问题 @byoungb 我也有同样的问题。这个解决方案是否仍然适用于scrapy 1.0?您发布的链接现在已损坏。【参考方案2】:我在设置文件中将CELERYD_MAX_TASKS_PER_CHILD 设置为 1,这样就解决了这个问题。每次蜘蛛运行后,worker 守护进程都会启动一个新进程,该进程负责处理反应器。
【讨论】:
这是一个聪明的主意,不确定 celery 一直在重新启动任务有多少开销。但这是有道理的。所以是的,对于未来的用户,您可能想尝试一下。 这些设置在最近的 celery 版本中被重命名。您也可以使用此标志从命令行应用它:--max-tasks-per-child 1
当前 Celery 版本 4.3.0 的设置名称是 WORKER_MAX_TASKS_PER_CHILD
。参见文档:docs.celeryproject.org/en/latest/userguide/configuration.html & docs.celeryproject.org/en/latest/userguide/…以上是关于在 Celery 任务中运行 Scrapy 蜘蛛的主要内容,如果未能解决你的问题,请参考以下文章
InterfaceError:连接已关闭(使用 django + celery + Scrapy)
通过扭曲的inlineCallbacks运行Scrapy蜘蛛