yield self.engine.open_spider()重点是第一次开始执行nextcall.schedule() 和心跳,接下来分析心跳执行engine._next_request_from_
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了yield self.engine.open_spider()重点是第一次开始执行nextcall.schedule() 和心跳,接下来分析心跳执行engine._next_request_from_相关的知识,希望对你有一定的参考价值。
1.第一次nextcall.schedule()将从 start_requests 取出一个 request 之后放入Scheduler,反复执行到取完。
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class ExecutionEngine(object): @defer.inlineCallbacks def open_spider(self, spider, start_requests=(), close_if_idle=True): assert self.has_capacity(), "No free spider slot when opening %r" % spider.name logger.info("Spider opened", extra={‘spider‘: spider}) nextcall = CallLaterOnce(self._next_request, spider) scheduler = self.scheduler_cls.from_crawler(self.crawler) start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider) slot = Slot(start_requests, close_if_idle, nextcall, scheduler) self.slot = slot self.spider = spider yield scheduler.open(spider) yield self.scraper.open_spider(spider) self.crawler.stats.open_spider(spider) yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider) slot.nextcall.schedule() #前面已经实例化 self.slot, 到这里才第一次执行 schedule(),在下一大步 engine.start() 会执行 self.running = True, slot.heartbeat.start(5) #心跳#################### #然后更外层 crawler_process.start() 执行 reactor.run(), 这个时候 reactor.callLater(delay, self) 才会真正运行 _next_request()
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\utils\reactor.py
class CallLaterOnce(object): """Schedule a function to be called in the next reactor loop, but only if it hasn‘t been already scheduled since the last time it ran. """ def __init__(self, func, *a, **kw): # func 传入了 self._next_request self._func = func self._a = a self._kw = kw self._call = None def schedule(self, delay=0): if self._call is None: #说明之前 schedule 的 func 已经被执行,才会再一次 schedule self._call = reactor.callLater(delay, self) # 之后将被调用的方法为 self,对应魔法方法。self._call立即指向 callLater 返回实例类似 <DelayedCall 0x2369408 [0.0s] called=0 cancelled=0 <lambda>(0)> def cancel(self): if self._call: self._call.cancel() def __call__(self): # 魔法方法,c=C(),调用 c() 将调用 __call__ self._call = None #一旦之前 schedule 的 func 开始被执行,马上置 None return self._func(*self._a, **self._kw)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class ExecutionEngine(object): def _next_request(self, spider): slot = self.slot if not slot: return if self.paused: return while not self._needs_backout(spider): # 初始 self.running 为 False,_needs_backout(拆除撤销) 取 True, while失效 if not self._next_request_from_scheduler(spider): #engine.open_spider() 之后 engine.start() 会将 self.running 置 True,while生效!!! break #最开始还没将start_requests放入scheduler #slot.closing 或 self.downloader.needs_backout() 或 self.scraper.slot.needs_backout() 任一个取真都会导致while失效 if slot.start_requests and not self._needs_backout(spider): #engine.start 之前也不会执行 if 内部语句 try: request = next(slot.start_requests) #逐个取出种子请求 next:Return the next item from the iterator. 比如 next(iter([1,2,3])) 返回 1 except StopIteration: slot.start_requests = None #种子请求取完了 except Exception: slot.start_requests = None logger.error(‘Error while obtaining start requests‘, exc_info=True, extra={‘spider‘: spider}) else: self.crawl(request, spider) if self.spider_is_idle(spider) and slot.close_if_idle: self._spider_idle(spider)
same
def _needs_backout(self, spider): slot = self.slot return not self.running or slot.closing or self.downloader.needs_backout() or self.scraper.slot.needs_backout()
xx
def crawl(self, request, spider): assert spider in self.open_spiders, "Spider %r not opened when crawling: %s" % (spider.name, request) self.schedule(request, spider) #request 放入优先级队列[相应优先级的内存队列] self.slot.nextcall.schedule() #确保每一个种子请求及时放入调度器,间隙会立刻执行调度器中已放入的request
xx
def schedule(self, request, spider): self.signals.send_catch_log(signal=signals.request_scheduled, request=request, spider=spider) if not self.slot.scheduler.enqueue_request(request): self.signals.send_catch_log(signal=signals.request_dropped, request=request, spider=spider)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\scheduler.py
class Scheduler(object): def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False dqok = self._dqpush(request) #默认不使用磁盘队列 if dqok: self.stats.inc_value(‘scheduler/enqueued/disk‘, spider=self.spider) else: self._mqpush(request) #默认使用内存队列 self.stats.inc_value(‘scheduler/enqueued/memory‘, spider=self.spider) self.stats.inc_value(‘scheduler/enqueued‘, spider=self.spider) return True
request 放入优先级队列[相应优先级的内存队列]
def _mqpush(self, request): self.mqs.push(request, -request.priority)
2.后续靠心跳执行_next_request_from_scheduler()取出request实现循环调度
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class ExecutionEngine(object): @defer.inlineCallbacks def open_spider(self, spider, start_requests=(), close_if_idle=True): assert self.has_capacity(), "No free spider slot when opening %r" % spider.name logger.info("Spider opened", extra={‘spider‘: spider}) nextcall = CallLaterOnce(self._next_request, spider) scheduler = self.scheduler_cls.from_crawler(self.crawler) start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider) slot = Slot(start_requests, close_if_idle, nextcall, scheduler) self.slot = slot self.spider = spider yield scheduler.open(spider) yield self.scraper.open_spider(spider) self.crawler.stats.open_spider(spider) yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider) slot.nextcall.schedule() slot.heartbeat.start(5)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class Slot(object): def __init__(self, start_requests, close_if_idle, nextcall, scheduler): self.closing = False self.inprogress = set() # requests in progress self.start_requests = iter(start_requests) self.close_if_idle = close_if_idle self.nextcall = nextcall self.scheduler = scheduler self.heartbeat = task.LoopingCall(nextcall.schedule)
xx
In [342]: from twisted.internet import defer, task In [343]: task.LoopingCall? Init signature: task.LoopingCall(self, f, *a, **kw) Docstring: Call a function repeatedly. If C{f} returns a deferred, rescheduling will not take place until the deferred has fired. The result value is ignored.
xx
In [349]: d=reactor.callLater(0,lambda x:x) In [350]: isinstance(d,defer.Deferred) Out[350]: False In [351]: d? Type: instance Base Class: twisted.internet.base.DelayedCall String form: <DelayedCall 0xacc2d48 [-14.5340001583s] called=0 cancelled=0 <lambda>()> File: c:\program files\anaconda2\lib\site-packages\twisted\internet\base.py Docstring: <no docstring>
.
以上是关于yield self.engine.open_spider()重点是第一次开始执行nextcall.schedule() 和心跳,接下来分析心跳执行engine._next_request_from_的主要内容,如果未能解决你的问题,请参考以下文章