调度 engine._next_request_from_scheduler() 取出request交给handler,结果是request,执行engine.craw(),结果是resp/fail,

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了调度 engine._next_request_from_scheduler() 取出request交给handler,结果是request,执行engine.craw(),结果是resp/fail,相关的知识,希望对你有一定的参考价值。

0.def _next_request_from_scheduler(self, spider):

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

    def _next_request_from_scheduler(self, spider):
        slot = self.slot
        request = slot.scheduler.next_request()  #首先从优先级队列取出一个 request
        if not request:
            return
        d = self._download(request, spider)  #request交给handler下载
        d.addBoth(self._handle_downloader_output, request, spider) #拿到下载结果执行回调
        d.addErrback(lambda f: logger.info(Error while handling downloader output,
                                           exc_info=failure_to_exc_info(f),
                                           extra={spider: spider}))
        d.addBoth(lambda _: slot.remove_request(request))
        d.addErrback(lambda f: logger.info(Error while removing request from slot,
                                           exc_info=failure_to_exc_info(f),
                                           extra={spider: spider}))
        d.addBoth(lambda _: slot.nextcall.schedule())
        d.addErrback(lambda f: logger.info(Error while scheduling new request,
                                           exc_info=failure_to_exc_info(f),
                                           extra={spider: spider}))
        return d

 

1. request = slot.scheduler.next_request() #scheduler从优先级队列[当前优先级]取出一个request返回给engine

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\scheduler.py

class Scheduler(object):

    def next_request(self):
        request = self.mqs.pop()  #这里的mqs指的是管理多个mqclss的pqclass,其中self.queues = {0:mqclss(),},如果使用dqclass,scheduler会先从指定文件夹加载保存的request,使用 PickleLifoDiskQueue
        if request:
            self.stats.inc_value(scheduler/dequeued/memory, spider=self.spider)
        else:
            request = self._dqpop()
            if request:
                self.stats.inc_value(scheduler/dequeued/disk, spider=self.spider)
        if request:
            self.stats.inc_value(scheduler/dequeued, spider=self.spider)
        return request

C:\Program Files\Anaconda2\Lib\site-packages\queuelib\pqueue.py                     ########################后面补充入队相关操作

class PriorityQueue(object):
    def push(self, obj, priority=0):  #入队默认优先级为0
        if priority not in self.queues:
            self.queues[priority] = self.qfactory(priority)
        q = self.queues[priority]
        q.push(obj) # this may fail (eg. serialization error)
        if self.curprio is None or priority < self.curprio:  #入队的时候发现更高优先级,更新当前优先级,保证优先处理重定向
            self.curprio = priority

    def pop(self):
        if self.curprio is None:
            return
        q = self.queues[self.curprio]
        m = q.pop()  #取出当前pri的一个request
        if len(q) == 0: #如果取完之后当前pri队列为空,则更新当前pri
            del self.queues[self.curprio]
            q.close()
            prios = [p for p, q in self.queues.items() if len(q) > 0]
            self.curprio = min(prios) if prios else None  #更新当前pri为最小值
        return m 

2. d = self._download(request, spider) #request交给handler下载

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

    def _download(self, request, spider):
        slot = self.slot
        slot.add_request(request)        #slot:self.inprogress = set()     self.inprogress.add(request)
        def _on_success(response):
            assert isinstance(response, (Response, Request))
            if isinstance(response, Response):
                response.request = request # tie request to response received
                logkws = self.logformatter.crawled(request, response, spider)
                logger.log(*logformatter_adapter(logkws), extra={spider: spider})
                self.signals.send_catch_log(signal=signals.response_received,                     response=response, request=request, spider=spider)
            return response

        def _on_complete(_):
            slot.nextcall.schedule()
            return _

        dwld = self.downloader.fetch(request, spider)    #DOWNLOADER = ‘scrapy.core.downloader.Downloader‘  
        dwld.addCallbacks(_on_success) ###############################################
        dwld.addBoth(_on_complete) #############################################
        return dwld

###dwld = self.downloader.fetch(request, spider)

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py

from .middleware import DownloaderMiddlewareManager
from .handlers import DownloadHandlers
class Downloader(object):

    def __init__(self, crawler):
        self.handlers = DownloadHandlers(crawler)
        self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)

    def fetch(self, request, spider):
        def _deactivate(response):
            self.active.remove(request)
            return response

        self.active.add(request)
        dfd = self.middleware.download(self._enqueue_request, request, spider)
        return dfd.addBoth(_deactivate) 

####先在 DownloaderMiddlewareManager 加工 request

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\middleware.py

class DownloaderMiddlewareManager(MiddlewareManager):

    def download(self, download_func, request, spider):
        @defer.inlineCallbacks
        def process_request(request):
            for method in self.methods[process_request]:
                response = yield method(request=request, spider=spider)
                assert response is None or isinstance(response, (Response, Request)),                         Middleware %s.process_request must return None, Response or Request, got %s %                         (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
                if response:
                    defer.returnValue(response)
            defer.returnValue((yield download_func(request=request,spider=spider))) #正常流程是对 request 进行一系列加工,去 yield 传入的 _enqueue_request()

        deferred = mustbe_deferred(process_request, request)  #正常流程走完,激活走下面的 process_exception 或 process_response
        deferred.addErrback(process_exception)
        deferred.addCallback(process_response)
        return deferred

 ####加工后的request存入downloader维护的 self.slots{hostname:slot},顺便从当前slot queue取出request交给handler下载,直到填满当前域名最大并行数

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py

class Downloader(object):

    def __init__(self, crawler):
        self.slots = {}    #字典 self.slots[key] = Slot(conc, delay, self.randomize_delay)

    def _enqueue_request(self, request, spider):   #从scheduler取出request》》》downloader.fetch:mw加工request,在downloader维护的slots字典中选定slot
        key, slot = self._get_slot(request, spider)  #key默认是hostname,指向一个对应的slot。如果字典没有对应的slot,新建。
        request.meta[download_slot] = key

        def _deactivate(response):
            slot.active.remove(request)
            return response

        slot.active.add(request)  ##
        deferred = defer.Deferred().addBoth(_deactivate)  ##
        slot.queue.append((request, deferred))  # queue无限存储  active set()  transferring set()
        self._process_queue(spider, slot)  # 当前hostname对应的slot的并行有空缺,则取出request交给handler,并一定就是当前存入slot queue的request。
        return deferred

    def _process_queue(self, spider, slot):
        if slot.latercall and slot.latercall.active():
            return

        # Delay queue processing if a download_delay is configured
        now = time()
        delay = slot.download_delay()  #默认DOWNLOAD_DELAY = 0
        if delay:
            penalty = delay - now + slot.lastseen
            if penalty > 0:
                slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
                return

        # Process enqueued requests if there are free slots to transfer for this slot  #只要当前hostname对应的queue存储不为空而且并行有空缺
        while slot.queue and slot.free_transfer_slots() > 0:  #默认每个域名即hostname对应一个slot,CONCURRENT_REQUESTS_PER_DOMAIN = 8,这里计算 8减去self.transferring,下面显示request交给handler之后加1
            slot.lastseen = now
            request, deferred = slot.queue.popleft()
            dfd = self._download(slot, request, spider)
            dfd.chainDeferred(deferred)
            # prevent burst if inter-request delays were configured
            if delay:
                self._process_queue(spider, slot)
                break

    def _download(self, slot, request, spider):
        # The order is very important for the following deferreds. Do not change!

        # 1. Create the download deferred
        dfd = mustbe_deferred(self.handlers.download_request, request, spider)  #交给handler

        # 2. Notify response_downloaded listeners about the recent download
        # before querying queue for next request
        def _downloaded(response):
            self.signals.send_catch_log(signal=signals.response_downloaded,
                                        response=response,
                                        request=request,
                                        spider=spider)
            return response
        dfd.addCallback(_downloaded)  #handler返回resp,通知。。。。。。。。。。。

        # 3. After response arrives,  remove the request from transferring
        # state to free up the transferring slot so it can be used by the
        # following requests (perhaps those which came from the downloader
        # middleware itself)
        slot.transferring.add(request)   #request交给handler之后,加1

        def finish_transferring(_):
            slot.transferring.remove(request)
            self._process_queue(spider, slot)
            return _

        return dfd.addBoth(finish_transferring)  #handler返回resp,减1

####直到填满当前slot并行数:从slot queue取出的request,根据 scheme 选择相应 handler ,比如 http 选择 HTTPDownloadHandler 实际对应 \core\downloader\handlers\http11.py HTTP11DownloadHandler

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\__init__.py

class DownloadHandlers(object):

    def __init__(self, crawler):
        self._schemes = {}  # stores acceptable schemes on instancing
        handlers = without_none_values(
            crawler.settings.getwithbase(DOWNLOAD_HANDLERS))
        for scheme, clspath in six.iteritems(handlers):  #字典 {scheme: clspath}
            self._schemes[scheme] = clspath

    def download_request(self, request, spider):
        scheme = urlparse_cached(request).scheme
        handler = self._get_handler(scheme)  #选择 handler
        if not handler:
            raise NotSupported("Unsupported URL scheme ‘%s‘: %s" %
                               (scheme, self._notconfigured[scheme]))
        return handler.download_request(request, spider)

 

 C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\http11.py

最后跟踪到如下,

其中 agent 是 from twisted.web.client import Agent

L{Agent} is a very basic HTTP client. It supports I{HTTP} and I{HTTPS} scheme URIs

        d = agent.request(
            method, to_bytes(url, encoding=ascii), headers, bodyproducer)
        # set download latency
        d.addCallback(self._cb_latency, request, start_time)
        # response body is ready to be consumed
        d.addCallback(self._cb_bodyready, request)
        d.addCallback(self._cb_bodydone, request, url)
        # check download timeout
        self._timeout_cl = reactor.callLater(timeout, d.cancel)
        d.addBoth(self._cb_timeout, request, url, timeout)
        return d

 

正常则最终返回 response

return respcls(url=url, status=status, headers=headers, body=body, flags=flags)

停。

 

###fetch 将request交给handler拿到结果之后的回调 dwld.addCallbacks(_on_success)  和 dwld.addBoth(_on_complete)

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class ExecutionEngine(object):

    def _download(self, request, spider):
        slot = self.slot
        slot.add_request(request)
        def _on_success(response):
            assert isinstance(response, (Response, Request))
            if isinstance(response, Response):
                response.request = request # tie request to response received
                logkws = self.logformatter.crawled(request, response, spider)
                logger.log(*logformatter_adapter(logkws), extra={spider: spider})
                self.signals.send_catch_log(signal=signals.response_received,                     response=response, request=request, spider=spider)
            return response   #如果结果是response,发出信号

        def _on_complete(_):
            slot.nextcall.schedule()  #触发同心跳操作
            return _

        dwld = self.downloader.fetch(request, spider)
        dwld.addCallbacks(_on_success)
        dwld.addBoth(_on_complete)
        return dwld  #返回结果

3. d.addBoth(self._handle_downloader_output, request, spider) #拿到下载结果执行回调

###回调  d.addBoth(self._handle_downloader_output, request, spider) 

C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py

class ExecutionEngine(object):

    def _handle_downloader_output(self, response, request, spider):
        assert isinstance(response, (Request, Response, Failure)), response    #结果只能是 request/response/failure
        # downloader middleware can return requests (for example, redirects)
        if isinstance(response, Request):   #结果是 request
            self.crawl(response, spider)
            return
        # response is a Response or Failure
        d = self.scraper.enqueue_scrape(response, request, spider)  #结果是 response/failure,交给scraper
        d.addErrback(lambda f: logger.error(Error while enqueuing downloader output,
                                            exc_info=failure_to_exc_info(f),
                                            extra={spider: spider}))
        return d

###回调  d.addBoth(lambda _: slot.remove_request(request))

###回调  d.addBoth(lambda _: slot.nextcall.schedule())

 

以上是关于调度 engine._next_request_from_scheduler() 取出request交给handler,结果是request,执行engine.craw(),结果是resp/fail,的主要内容,如果未能解决你的问题,请参考以下文章

Scrapy架构概述

scrapy流程大致分析

scrapy爬虫框架处理流程简介

yield self.engine.open_spider()重点是第一次开始执行nextcall.schedule() 和心跳,接下来分析心跳执行engine._next_request_from_

gcloud ml-engine本地火车...没有名为XXX的模块

如何减少 Google App Engine 数据存储延迟?