Twisted

Posted hpython

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Twisted相关的知识,希望对你有一定的参考价值。

示例一

from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.internet import defer


def callback(result):
    print(\'下载完成了\',result)
    return result

# d = Deferred对象
# 对象内部封装了:url=http://www.bing.com 和 callback
# 为本次请求创建socket对象,添加到while循环中。
d = getPage(bytes(\'http://www.bing.com\', encoding=\'utf8\'))
d.addCallback(callback)


def stop():
    reactor.stop()

# 监听所有的Deferred对象=socket对象,如果列表中所有的Deferred对象=socket对象都已经下载完成
dl = defer.DeferredList([d,])
# 那么就执行stop函数
dl.addBoth(stop)


reactor.run()

示例二

from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.internet import defer


def callback(result):
    print(\'下载完成了\',result)
    return result

@defer.inlineCallbacks
def task():
    # d = Deferred对象
    # 对象内部封装了:url=http://www.bing.com 和 callback
    # 为本次请求创建socket对象,添加到while循环中。
    d = getPage(bytes(\'http://www.bing.com\', encoding=\'utf8\'))
    d.addCallback(callback)
    yield d

def stop():
    reactor.stop()

dlist = []
for i in range(1):
    d = task()
    dlist.append(d)
# 监听所有的Deferred对象=socket对象,如果列表中所有的Deferred对象=socket对象都已经下载完成
dl = defer.DeferredList(dlist)
# 那么就执行stop函数
dl.addBoth(stop)


reactor.run()

示例三

from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.internet import defer


def gggg(result):
    print(\'下载完成了\',result)
    return result


# 如果创建一个 Deferred 对象,表示:创建了一个永远不可能完成的任务
# 用户如果主动调用:d.callback("asdfasdf"),表示手动完成任务了。
d = defer.Deferred()
d.addCallback(gggg)


reactor.run()

示例四

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor
import threading

counter = 0


def callback(arg):
    print("arg----",arg)
    global counter
    counter += 1
    if counter == 5:
        _closewait.callback(None)
    print(\'one\', len(arg))

def funcc(args):
    print("_",args)
    reactor.callLater(0, _next_request)

def _next_request_from_scheduler():
    ret = getPage(bytes(\'http://www.chouti.com\', encoding=\'utf8\'))
    #
    ret.addCallback(callback)
    #
    ret.addCallback(funcc)
    # ret.addCallback(lambda _: reactor.callLater(0, _next_request))


def _next_request():
    _next_request_from_scheduler()

###############################
_closewait = None

@defer.inlineCallbacks
def task(url):
    reactor.callLater(0, _next_request)
    yield engine_start()

@defer.inlineCallbacks
def engine_start():
    global _closewait
    _closewait = defer.Deferred()
    yield _closewait
#################################

def stop(arg):
    print(\'已经全部完成\')
    reactor.stop()


if __name__ == \'__main__\':
    url = \'http://www.cnblogs.com\'

    defer_list = []
    deferObj = task(url)
    defer_list.append(deferObj)

    v = defer.DeferredList(defer_list)
    v.addBoth(stop)
    reactor.run()
View Code

示例五

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.web.client import getPage, defer
from twisted.internet import reactor
import queue


class Response(object):
    def __init__(self, body, request):
        self.body = body
        self.request = request
        self.url = request.url

    @property
    def text(self):
        return self.body.decode(\'utf-8\')


class Request(object):
    def __init__(self, url, callback=None):
        self.url = url
        self.callback = callback


class Scheduler(object):
    def __init__(self, engine):
        self.q = queue.Queue()
        self.engine = engine

    def enqueue_request(self, request):
        self.q.put(request)

    def next_request(self):
        try:
            req = self.q.get(block=False)
        except Exception as e:
            req = None

        return req

    def size(self):
        return self.q.qsize()


class ExecutionEngine(object):
    def __init__(self):
        self._closewait = None
        self.running = True
        self.start_requests = None
        self.scheduler = Scheduler(self)

        self.inprogress = set()

    def check_empty(self, response):
        if not self.running:
            self._closewait.callback(\'......\')

    def _next_request(self):
        while self.start_requests:
            try:
                request = next(self.start_requests)
            except StopIteration:
                self.start_requests = None
            else:
                self.scheduler.enqueue_request(request)

        while len(self.inprogress) < 5 and self.scheduler.size() > 0:  # 最大并发数为5

            request = self.scheduler.next_request()
            if not request:
                break

            self.inprogress.add(request)
            d = getPage(bytes(request.url, encoding=\'utf-8\'))
            d.addBoth(self._handle_downloader_output, request)
            d.addBoth(lambda x, req: self.inprogress.remove(req), request)
            d.addBoth(lambda x: self._next_request())

        if len(self.inprogress) == 0 and self.scheduler.size() == 0:
            self._closewait.callback(None)

    def _handle_downloader_output(self, body, request):
        """
        获取内容,执行回调函数,并且把回调函数中的返回值获取,并添加到队列中
        :param response:
        :param request:
        :return:
        """
        import types

        response = Response(body, request)
        func = request.callback or self.spider.parse
        gen = func(response)
        if isinstance(gen, types.GeneratorType):
            for req in gen:
                self.scheduler.enqueue_request(req)

    @defer.inlineCallbacks
    def start(self):
        self._closewait = defer.Deferred()
        yield self._closewait

    def open_spider(self, spider, start_requests):
        self.start_requests = start_requests
        self.spider = spider
        reactor.callLater(0, self._next_request)


class Crawler(object):
    def __init__(self, spidercls):
        self.spidercls = spidercls

        self.spider = None
        self.engine = None

    @defer.inlineCallbacks
    def crawl(self):
        self.engine = ExecutionEngine()
        # 创建爬虫对象
        self.spider = self.spidercls()
        start_requests = iter(self.spider.start_requests())
        #start_requests = iter(start_requests)
        self.engine.open_spider(self.spider, start_requests)
        yield self.engine.start()


class CrawlerProcess(object):
    def __init__(self):
        self._active = set() # {d,}
        self.crawlers = set() # Crawler对象(ChoutiSpider类)

    def crawl(self, spidercls, *args, **kwargs):
        # spidercls=ChoutiSpider类
        crawler = Crawler(spidercls)

        self.crawlers.add(crawler)

        # 永远不可能完成的deferred对象
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)
        return d

    def start(self):
        dl = defer.DeferredList(self._active)
        dl.addBoth(self._stop_reactor)
        reactor.run()

    def _stop_reactor(self, _=None):
        reactor.stop()


class Spider(object):
    def start_requests(self):
        for url in self.start_urls:
            yield Request(url)


class ChoutiSpider(Spider):
    name = "chouti"
    start_urls = [
        \'http://dig.chouti.com/\',
    ]

    def parse(self, response):
        print(response.text)



if __name__ == \'__main__\':

    crawler_process = CrawlerProcess()
    crawler_process.crawl(ChoutiSpider)
    crawler_process.start()
View Code

更多看这里

 

 

 

 

 

1

以上是关于Twisted的主要内容,如果未能解决你的问题,请参考以下文章

爬虫日记(106):Twisted:单元测试怎么样编写

爬虫日记(93):Twisted的设计模型

Pymodbus/Twisted 异步客户端重新连接

爬虫日记(79):Twisted的延时机制

Twisted:使用 pyglet-twisted 时如何从 EndPoint 调用 Deferred

Twisted源码分析3