yield self.engine.open_spider()重点是第一次开始执行nextcall.schedule() 和心跳,接下来分析心跳执行engine._next_request_from_

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了yield self.engine.open_spider()重点是第一次开始执行nextcall.schedule() 和心跳,接下来分析心跳执行engine._next_request_from_相关的知识,希望对你有一定的参考价值。

1.第一次nextcall.schedule()将从 start_requests 取出一个 request 之后放入Scheduler,反复执行到取完。

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class ExecutionEngine(object):

    @defer.inlineCallbacks
    def open_spider(self, spider, start_requests=(), close_if_idle=True):
        assert self.has_capacity(), "No free spider slot when opening %r" %             spider.name
        logger.info("Spider opened", extra={spider: spider})
        nextcall = CallLaterOnce(self._next_request, spider)
        scheduler = self.scheduler_cls.from_crawler(self.crawler)
        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
        self.slot = slot
        self.spider = spider
        yield scheduler.open(spider)
        yield self.scraper.open_spider(spider)
        self.crawler.stats.open_spider(spider)
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
        slot.nextcall.schedule()  #前面已经实例化 self.slot, 到这里才第一次执行 schedule(),在下一大步 engine.start() 会执行 self.running = True, 
        slot.heartbeat.start(5) #心跳####################   #然后更外层 crawler_process.start()  执行 reactor.run(), 这个时候 reactor.callLater(delay, self) 才会真正运行 _next_request()

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\utils\reactor.py

class CallLaterOnce(object):
    """Schedule a function to be called in the next reactor loop, but only if
    it hasn‘t been already scheduled since the last time it ran.
    """

    def __init__(self, func, *a, **kw):  # func 传入了 self._next_request
        self._func = func
        self._a = a
        self._kw = kw
        self._call = None

    def schedule(self, delay=0):
        if self._call is None:  #说明之前 schedule 的 func 已经被执行,才会再一次 schedule
            self._call = reactor.callLater(delay, self)  # 之后将被调用的方法为 self,对应魔法方法。self._call立即指向 callLater 返回实例类似 <DelayedCall 0x2369408 [0.0s] called=0 cancelled=0 <lambda>(0)>

    def cancel(self):
        if self._call:
            self._call.cancel()

    def __call__(self):  # 魔法方法,c=C(),调用 c() 将调用 __call__
        self._call = None  #一旦之前 schedule 的 func 开始被执行,马上置 None
        return self._func(*self._a, **self._kw)

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class ExecutionEngine(object):

    def _next_request(self, spider):
        slot = self.slot
        if not slot:
            return

        if self.paused:
            return
      
        while not self._needs_backout(spider):          # 初始 self.running 为 False,_needs_backout(拆除撤销) 取 True, while失效
            if not self._next_request_from_scheduler(spider):  #engine.open_spider() 之后 engine.start() 会将 self.running 置 True,while生效!!!
                break #最开始还没将start_requests放入scheduler  #slot.closing 或 self.downloader.needs_backout() 或 self.scraper.slot.needs_backout() 任一个取真都会导致while失效

        if slot.start_requests and not self._needs_backout(spider):  #engine.start 之前也不会执行 if 内部语句
            try:
                request = next(slot.start_requests)  #逐个取出种子请求      next:Return the next item from the iterator.  比如 next(iter([1,2,3])) 返回 1
            except StopIteration:
                slot.start_requests = None   #种子请求取完了
            except Exception:
                slot.start_requests = None
                logger.error(Error while obtaining start requests,
                             exc_info=True, extra={spider: spider})
            else:
                self.crawl(request, spider)

        if self.spider_is_idle(spider) and slot.close_if_idle:
            self._spider_idle(spider)

 same

def _needs_backout(self, spider):
        slot = self.slot
        return not self.running             or slot.closing             or self.downloader.needs_backout()             or self.scraper.slot.needs_backout()

xx

def crawl(self, request, spider):
        assert spider in self.open_spiders,             "Spider %r not opened when crawling: %s" % (spider.name, request)
        self.schedule(request, spider)  #request 放入优先级队列[相应优先级的内存队列]
        self.slot.nextcall.schedule()  #确保每一个种子请求及时放入调度器,间隙会立刻执行调度器中已放入的request

xx

    def schedule(self, request, spider):
        self.signals.send_catch_log(signal=signals.request_scheduled,
                request=request, spider=spider)
        if not self.slot.scheduler.enqueue_request(request):
            self.signals.send_catch_log(signal=signals.request_dropped,
                                        request=request, spider=spider)

 

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\scheduler.py

class Scheduler(object):

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        dqok = self._dqpush(request)  #默认不使用磁盘队列
        if dqok:
            self.stats.inc_value(scheduler/enqueued/disk, spider=self.spider)
        else:
            self._mqpush(request)  #默认使用内存队列
            self.stats.inc_value(scheduler/enqueued/memory, spider=self.spider)
        self.stats.inc_value(scheduler/enqueued, spider=self.spider)
        return True

request 放入优先级队列[相应优先级的内存队列]

    def _mqpush(self, request):
        self.mqs.push(request, -request.priority)

2.后续靠心跳执行_next_request_from_scheduler()取出request实现循环调度

 C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class ExecutionEngine(object):

    @defer.inlineCallbacks
    def open_spider(self, spider, start_requests=(), close_if_idle=True):
        assert self.has_capacity(), "No free spider slot when opening %r" %             spider.name
        logger.info("Spider opened", extra={spider: spider})
        nextcall = CallLaterOnce(self._next_request, spider)
        scheduler = self.scheduler_cls.from_crawler(self.crawler)
        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
        self.slot = slot
        self.spider = spider
        yield scheduler.open(spider)
        yield self.scraper.open_spider(spider)
        self.crawler.stats.open_spider(spider)
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
        slot.nextcall.schedule()
        slot.heartbeat.start(5)

 C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class Slot(object):

    def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
        self.closing = False
        self.inprogress = set() # requests in progress
        self.start_requests = iter(start_requests)
        self.close_if_idle = close_if_idle
        self.nextcall = nextcall
        self.scheduler = scheduler
        self.heartbeat = task.LoopingCall(nextcall.schedule)

xx

In [342]: from twisted.internet import defer, task

In [343]: task.LoopingCall?
Init signature: task.LoopingCall(self, f, *a, **kw)
Docstring:
Call a function repeatedly.

If C{f} returns a deferred, rescheduling will not take place until the
deferred has fired. The result value is ignored.

xx

In [349]: d=reactor.callLater(0,lambda x:x)

In [350]: isinstance(d,defer.Deferred)
Out[350]: False

In [351]: d?
Type:           instance
Base Class:     twisted.internet.base.DelayedCall
String form:    <DelayedCall 0xacc2d48 [-14.5340001583s] called=0 cancelled=0 <lambda>()>
File:           c:\program files\anaconda2\lib\site-packages\twisted\internet\base.py
Docstring:      <no docstring>

.

 

以上是关于yield self.engine.open_spider()重点是第一次开始执行nextcall.schedule() 和心跳,接下来分析心跳执行engine._next_request_from_的主要内容,如果未能解决你的问题,请参考以下文章

yield用法

爬虫:Scrapy 中 yield 和 return 的区别

sleep和yield的区别

Python异步编程02--yield用法

程序开发中yield是啥,怎么用?

学 Python 怎能不知 yield?