在 Celery 任务中运行 Scrapy 蜘蛛

Posted

技术标签:

【中文标题】在 Celery 任务中运行 Scrapy 蜘蛛【英文标题】:Running Scrapy spiders in a Celery task 【发布时间】:2012-07-16 17:49:37 【问题描述】:

我有一个 Django 站点,当用户请求它时会发生抓取,我的代码会在新进程中启动一个 Scrapy spider 独立脚本。当然,这不适用于用户的增加。

类似这样的:

class StandAloneSpider(Spider):
    #a regular spider

settings.overrides['LOG_ENABLED'] = True
#more settings can be changed...

crawler = CrawlerProcess( settings )
crawler.install()
crawler.configure()

spider = StandAloneSpider()

crawler.crawl( spider )
crawler.start()

我决定使用 Celery 并使用 worker 来排队抓取请求。

但是,我遇到了 Tornado 反应堆无法重新启动的问题。第一个和第二个蜘蛛运行成功,但是后续的蜘蛛会抛出 ReactorNotRestartable 错误。

任何人都可以分享在 Celery 框架中运行 Spider 的任何技巧吗?

【问题讨论】:

【参考方案1】:

好的,这就是我如何让 Scrapy 与我的 Django 项目一起工作,该项目使用 Celery 对要抓取的内容进行排队。实际的解决方法主要来自 joehillen 的代码,位于 http://snippets.scrapy.org/snippets/13/

首先是tasks.py 文件

from celery import task

@task()
def crawl_domain(domain_pk):
    from crawl import domain_crawl
    return domain_crawl(domain_pk)

然后是crawl.py 文件

from multiprocessing import Process
from scrapy.crawler import CrawlerProcess
from scrapy.conf import settings
from spider import DomainSpider
from models import Domain

class DomainCrawlerScript():

    def __init__(self):
        self.crawler = CrawlerProcess(settings)
        self.crawler.install()
        self.crawler.configure()

    def _crawl(self, domain_pk):
        domain = Domain.objects.get(
            pk = domain_pk,
        )
        urls = []
        for page in domain.pages.all():
            urls.append(page.url())
        self.crawler.crawl(DomainSpider(urls))
        self.crawler.start()
        self.crawler.stop()

    def crawl(self, domain_pk):
        p = Process(target=self._crawl, args=[domain_pk])
        p.start()
        p.join()

crawler = DomainCrawlerScript()

def domain_crawl(domain_pk):
    crawler.crawl(domain_pk)

这里的技巧是“从多处理导入过程”,它解决了 Twisted 框架中的“ReactorNotRestartable”问题。所以基本上 Celery 任务调用“domain_crawl”函数,它一遍又一遍地重用“DomainCrawlerScript”对象来与你的 Scrapy 蜘蛛交互。 (我知道我的示例有点多余,但我在设置多个版本的 python 时这样做是有原因的 [我的 django 网络服务器实际上使用的是 python2.4,而我的工作服务器使用的是 python2.7])

在我的示例中,“DomainSpider”只是一个经过修改的 Scrapy Spider,它接受一个 url 列表,然后将它们设置为“start_urls”。

希望这会有所帮助!

【讨论】:

你使用 Postgresql 吗?我从 celery 收到了最奇怪的“InterfaceError: connection already closed”。 是的,我正在使用 postgresql,如果您想提供错误,我可以尝试帮助您弄清楚,我确实有一个听起来相似的错误,但我不记得它到底是什么或什么我做到了。 (我目前正在将 11,077,910 个项目加载到我的队列中,并且我有 5 台工作机器从中拉出,所以这个设置确实有效) 我发现了问题,这与将结果存储在 postgres 中有关。我认为是以下票:github.com/celery/django-celery/issues/121 我正在使用的解决方法是设置 CELERY_RESULT_BACKEND。你遇到过吗? 是的,你可能是对的,最新版本的 celery 太棒了,所以当我重写这个项目以使用最新版本时,我也会告诉你如何解决这个问题 @byoungb 我也有同样的问题。这个解决方案是否仍然适用于scrapy 1.0?您发布的链接现在已损坏。【参考方案2】:

我在设置文件中将CELERYD_MAX_TASKS_PER_CHILD 设置为 1,这样就解决了这个问题。每次蜘蛛运行后,worker 守护进程都会启动一个新进程,该进程负责处理反应器。

【讨论】:

这是一个聪明的主意,不确定 celery 一直在重新启动任务有多少开销。但这是有道理的。所以是的,对于未来的用户,您可能想尝试一下。 这些设置在最近的 celery 版本中被重命名。您也可以使用此标志从命令行应用它:--max-tasks-per-child 1 当前 Celery 版本 4.3.0 的设置名称是 WORKER_MAX_TASKS_PER_CHILD。参见文档:docs.celeryproject.org/en/latest/userguide/configuration.html & docs.celeryproject.org/en/latest/userguide/…

以上是关于在 Celery 任务中运行 Scrapy 蜘蛛的主要内容,如果未能解决你的问题,请参考以下文章

InterfaceError:连接已关闭(使用 django + celery + Scrapy)

通过扭曲的inlineCallbacks运行Scrapy蜘蛛

检查 celery beat 是不是启动并运行

以受控方式运行数十个 Scrapy 蜘蛛

Celery + Redis - .get()在平稳运行约70个小时后无限期挂起

Scrapy运行2个蜘蛛,使用一个进程将输出到2个不同的文件(AWS Lambda)