爬虫设计模式-twisted

Posted 老王的农场

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了爬虫设计模式-twisted相关的知识,希望对你有一定的参考价值。

from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.internet import defer

url_list = [\'http://www.bing.com\', \'http://www.baidu.com\', ]


def callback(arg):
    print(\'回来一个\', arg)


defer_list = []
for url in url_list:
    ret = getPage(bytes(url, encoding=\'utf8\'))
    ret.addCallback(callback)
    defer_list.append(ret)


def stop(arg):
    print(\'已经全部现在完毕\', arg)
    reactor.stop()


d = defer.DeferredList(defer_list)
d.addBoth(stop)

reactor.run()
twisted示例一
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.internet import defer


@defer.inlineCallbacks
def task(url):
    ret = getPage(bytes(url, encoding=\'utf8\'))
    ret.addCallback(callback)
    yield ret


def callback(arg):
    print(\'回来一个\', arg)


url_list = [\'http://www.bing.com\', \'http://www.baidu.com\', ]
defer_list = []
for url in url_list:
    ret = task(url)
    defer_list.append(ret)


def stop(arg):
    print(\'已经全部现在完毕\', arg)
    reactor.stop()


d = defer.DeferredList(defer_list)
d.addBoth(stop)
reactor.run()
twisted示例二
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor
import threading


def _next_request():
    _next_request_from_scheduler()


def _next_request_from_scheduler():
    ret = getPage(bytes(\'http://www.chouti.com\', encoding=\'utf8\'))
    ret.addCallback(callback)
    ret.addCallback(lambda _: reactor.callLater(0, _next_request))


_closewait = None

@defer.inlineCallbacks
def engine_start():
    global _closewait
    _closewait = defer.Deferred()
    yield _closewait


@defer.inlineCallbacks
def task(url):
    reactor.callLater(0, _next_request)
    yield engine_start()


counter = 0
def callback(arg):
    global counter
    counter +=1
    if counter == 10:
        _closewait.callback(None)
    print(\'one\', len(arg))


def stop(arg):
    print(\'all done\', arg)
    reactor.stop()


if __name__ == \'__main__\':
    url = \'http://www.cnblogs.com\'

    defer_list = []
    deferObj = task(url)
    defer_list.append(deferObj)

    v = defer.DeferredList(defer_list)
    v.addBoth(stop)
    reactor.run()
twisted示例三
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.web.client import getPage, defer
from twisted.internet import reactor
import queue


class Response(object):
    def __init__(self, body, request):
        self.body = body
        self.request = request
        self.url = request.url

    @property
    def text(self):
        return self.body.decode(\'utf-8\')


class Request(object):
    def __init__(self, url, callback=None):
        self.url = url
        self.callback = callback


class Scheduler(object):
    def __init__(self, engine):
        self.q = queue.Queue()
        self.engine = engine

    def enqueue_request(self, request):
        self.q.put(request)

    def next_request(self):
        try:
            req = self.q.get(block=False)
        except Exception as e:
            req = None

        return req

    def size(self):
        return self.q.qsize()


class ExecutionEngine(object):
    def __init__(self):
        self._closewait = None
        self.running = True
        self.start_requests = None
        self.scheduler = Scheduler(self)

        self.inprogress = set()

    def check_empty(self, response):
        if not self.running:
            self._closewait.callback(\'......\')

    def _next_request(self):
        while self.start_requests:
            try:
                request = next(self.start_requests)
            except StopIteration:
                self.start_requests = None
            else:
                self.scheduler.enqueue_request(request)

        while len(self.inprogress) < 5 and self.scheduler.size() > 0:  # 最大并发数为5

            request = self.scheduler.next_request()
            if not request:
                break

            self.inprogress.add(request)
            d = getPage(bytes(request.url, encoding=\'utf-8\'))
            d.addBoth(self._handle_downloader_output, request)
            d.addBoth(lambda x, req: self.inprogress.remove(req), request)
            d.addBoth(lambda x: self._next_request())

        if len(self.inprogress) == 0 and self.scheduler.size() == 0:
            self._closewait.callback(None)

    def _handle_downloader_output(self, body, request):
        """
        获取内容,执行回调函数,并且把回调函数中的返回值获取,并添加到队列中
        :param response: 
        :param request: 
        :return: 
        """
        import types

        response = Response(body, request)
        func = request.callback or self.spider.parse
        gen = func(response)
        if isinstance(gen, types.GeneratorType):
            for req in gen:
                self.scheduler.enqueue_request(req)

    @defer.inlineCallbacks
    def start(self):
        self._closewait = defer.Deferred()
        yield self._closewait

    def open_spider(self, spider, start_requests):
        self.start_requests = start_requests
        self.spider = spider
        reactor.callLater(0, self._next_request)


class Crawler(object):
    def __init__(self, spidercls):
        self.spidercls = spidercls

        self.spider = None
        self.engine = None

    @defer.inlineCallbacks
    def crawl(self):
        self.engine = ExecutionEngine()
        self.spider = self.spidercls()
        start_requests = iter(self.spider.start_requests())
        start_requests = iter(start_requests)
        self.engine.open_spider(self.spider, start_requests)
        yield self.engine.start()


class CrawlerProcess(object):
    def __init__(self):
        self._active = set()
        self.crawlers = set()

    def crawl(self, spidercls, *args, **kwargs):
        crawler = Crawler(spidercls)

        self.crawlers.add(crawler)
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)
        return d

    def start(self):
        dl = defer.DeferredList(self._active)
        dl.addBoth(self._stop_reactor)
        reactor.run()

    def _stop_reactor(self, _=None):
        reactor.stop()


class Spider(object):
    def start_requests(self):
        for url in self.start_urls:
            yield Request(url)


class ChoutiSpider(Spider):
    name = "chouti"
    start_urls = [
        \'http://dig.chouti.com/\',
    ]

    def parse(self, response):
        print(response.text)


class CnblogsSpider(Spider):
    name = "cnblogs"
    start_urls = [
        \'http://www.cnblogs.com/\',
    ]

    def parse(self, response):
        print(response.text)


if __name__ == \'__main__\':

    spider_cls_list = [ChoutiSpider, CnblogsSpider]

    crawler_process = CrawlerProcess()
    for spider_cls in spider_cls_list:
        crawler_process.crawl(spider_cls)

    crawler_process.start()
模拟scrapy框架
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import types
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor



class Request(object):
    def __init__(self, url, callback):
        self.url = url
        self.callback = callback
        self.priority = 0


class HttpResponse(object):
    def __init__(self, content, request):
        self.content = content
        self.request = request


class ChouTiSpider(object):

    def start_requests(self):
        url_list = [\'http://www.cnblogs.com/\', \'http://www.bing.com\']
        for url in url_list:
            yield Request(url=url, callback=self.parse)

    def parse(self, response):
        print(response.request.url)
        # yield Request(url="http://www.baidu.com", callback=self.parse)




from queue import Queue
Q = Queue()


class CallLaterOnce(object):
    def __init__(self, func, *a, **kw):
        self._func = func
        self._a = a
        self._kw = kw
        self._call = None

    def schedule(self, delay=0):
        if self._call is None:
            self._call = reactor.callLater(delay, self)

    def cancel(self):
        if self._call:
            self._call.cancel()

    def __call__(self):
        self._call = None
        return self._func(*self._a, **self._kw)


class Engine(object):
    def __init__(self):
        self.nextcall = None
        self.crawlling = []
        self.max = 5
        self._closewait = None

    def get_response(self,content, request):
        response = HttpResponse(content, request)
        gen = request.callback(response)
        if isinstance(gen, types.GeneratorType):
            for req in gen:
                req.priority = request.priority + 1
                Q.put(req)


    def rm_crawlling(self,response,d):
        self.crawlling.remove(d)

    def _next_request(self,spider):
        if Q.qsize() == 0 and len(self.crawlling) == 0:
            self._closewait.callback(None)

        if len(self.crawlling) >= 5:
            return
        while len(self.crawlling) < 5:
            try:
                req = Q.get(block=False)
            except Exception as e:
                req = None
            if not req:
                return
            d = getPage(req.url.encode(\'utf-8\'))
            self.crawlling.append(d)
            d.addCallback(self.get_response, req)
            d.addCallback(self.rm_crawlling,d)
            d.addCallback(lambda _: self.nextcall.schedule())


    @defer.inlineCallbacks
    def crawl(self):
        spider = ChouTiSpider()
        start_requests = iter(spider.start_requests())
        flag = True
        while flag:
            try:
                req = next(start_requests)
                Q.put(req)
            except StopIteration as e:
                flag = False

        self.nextcall = CallLaterOnce(self._next_request,spider)
        self.nextcall.schedule()

        self._closewait = defer.Deferred()
        yield self._closewait

    @defer.inlineCallbacks
    def pp(self):
        yield self.crawl()

_active = set()
obj = Engine()
d = obj.crawl()
_active.add(d)

li = defer.DeferredList(_active)
li.addBoth(lambda _,*a,**kw: reactor.stop())

reactor.run()
参考

 

以上是关于爬虫设计模式-twisted的主要内容,如果未能解决你的问题,请参考以下文章

爬虫日记(94):Twisted的reactor设计来源

Python爬虫编程思想(27):Twisted框架基础

爬虫日记(79):Twisted的延时机制

爬虫日记(106):Twisted:单元测试怎么样编写

爬虫日记(82):Twisted的线程返回值

爬虫日记(81):Twisted的线程池使用