scrapy-redis组件

Posted jassin-du

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scrapy-redis组件相关的知识,希望对你有一定的参考价值。

---恢复内容开始---

安装

pip3 install scrapy-redis 

目的:帮助开发者实现分布式爬虫程序。

技术分享图片

 源码:

技术分享图片
//   连接redis 
  @classmethod
    def from_settings(cls, settings):
        # 读取配置,连接redis
        server = get_redis_from_settings(settings)
        # XXX: This creates one-time key. needed to support to use this
        # class as standalone dupefilter with scrapy‘s default scheduler
        # if scrapy passes spider on open() method this wouldn‘t be needed
        # TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
        key = defaults.DUPEFILTER_KEY % {timestamp: int(time.time())}
        debug = settings.getbool(DUPEFILTER_DEBUG)
        return cls(server, key=key, debug=debug)
---------------------------------------------------------------------------
 cls(server, key=key, debug=debug) 类加()--> 实例化 ---> 找init
    def __init__(self, server, key, debug=False):
        # redis连接
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True
---------------------------------------------------------------------------------
由server = get_redis_from_settings(settings)
def get_redis_from_settings(settings):
    params = defaults.REDIS_PARAMS.copy()
    params.update(settings.getdict(REDIS_PARAMS))
    # XXX: Deprecate REDIS_* settings.
    for source, dest in SETTINGS_PARAMS_MAP.items():
        val = settings.get(source)
        if val:
            params[dest] = val

    # Allow ``redis_cls`` to be a path to a class.
    if isinstance(params.get(redis_cls), six.string_types):
        params[redis_cls] = load_object(params[redis_cls])

    return get_redis(**params)


# Backwards compatible alias.
from_settings = get_redis_from_settings

由return get_redis(**params)
def get_redis(**kwargs):
    redis_cls = kwargs.pop(redis_cls, defaults.REDIS_CLS)
    url = kwargs.pop(url, None)
    if url:
        return redis_cls.from_url(url, **kwargs)
    else:
        # 相当于我们自己写的连接redis,redis.Redis(host=‘xxx‘,port=‘‘)
        return redis_cls(**kwargs)



    def request_seen(self, request):
        # 将我们的请求变成唯一标识
        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
         # self.server----> redis连接
        added = self.server.sadd(self.key, fp)
        return added == 0

# 将我们的请求变成唯一标识
       fp = self.request_fingerprint(request)      
# sadd--> 往redis集合添加数据
  added = self.server.sadd(self.key, fp)
简单源码流程

具体解析:

自定义去重规则:

铺垫

import redis

conn = redis.Redis(host=127.0.0.1,port = 6379)

‘‘‘
# 往redis集合添加值, 添加成功返回1  0表示已经存在
v = conn.sadd("xxx",‘x1‘)
print(v)   # 1
‘‘‘


# 查找集合里面的元素
val = conn.smembers(xxx)
print(val)

去重源码

技术分享图片

 

# 自定义去重规则
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 当有from_crawler就先执行from_crawler
class RFPDupeFilter(BaseDupeFilter): logger = logger def __init__(self, server, key, debug=False): # redis连接 self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): # 读取配置,连接redis server = get_redis_from_settings(settings) # XXX: This creates one-time key. needed to support to use this # class as standalone dupefilter with scrapy‘s default scheduler # if scrapy passes spider on open() method this wouldn‘t be needed # TODO: Use SCRAPY_JOB env as default and fallback to timestamp. key = defaults.DUPEFILTER_KEY % {timestamp: int(time.time())} debug = settings.getbool(DUPEFILTER_DEBUG) return cls(server, key=key, debug=debug) # 当有from_crawler就先执行from_crawler @classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings)  
 5:
def request_seen(self, request): # 将我们的请求变成唯一标识 fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. # self.server----> redis连接 added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): return request_fingerprint(request) def close(self, reason=‘‘): self.clear() def clear(self): self.server.delete(self.key) def log(self, request, spider): if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {request: request}, extra={spider: spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {request: request}, extra={spider: spider}) self.logdupes = False

1:
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)


@classmethod
def from_settings(cls, settings):

# 读取配置,连接redis
server = get_redis_from_settings(settings)
  # key = dupefilter:%(timestamp)s --> 时间戳
key = defaults.DUPEFILTER_KEY % {‘timestamp‘: int(time.time())}
debug = settings.getbool(‘DUPEFILTER_DEBUG‘)
 
  4:类名加括号 --> 执行初始化 init
return cls(server, key=key, debug=debug)

2:connection。py
由 server = get_redis_from_settings(settings)

def get_redis_from_settings(settings):
params = defaults.REDIS_PARAMS.copy() 2.1 ----> 读自己的默认配置文件
params.update(settings.getdict(‘REDIS_PARAMS‘)) 2.2 ----> 读我们设置的配置文件
# XXX: Deprecate REDIS_* settings.
for source, dest in SETTINGS_PARAMS_MAP.items():
val = settings.get(source)
if val:
params[dest] = val

# Allow ``redis_cls`` to be a path to a class.
if isinstance(params.get(‘redis_cls‘), six.string_types):
params[‘redis_cls‘] = load_object(params[‘redis_cls‘])

return get_redis(**params)
2.1
# 默认配置
由params = defaults.REDIS_PARAMS.copy()
REDIS_PARAMS = {
‘socket_timeout‘: 30,
‘socket_connect_timeout‘: 30,
‘retry_on_timeout‘: True,
‘encoding‘: REDIS_ENCODING,
}.

# 设置了编码格式
‘encoding‘: REDIS_ENCODING,

# 找到所有的默认的配置信息
REDIS_ENCODING = ‘utf-8‘


2.2
# 自定义的配置信息

params.update(settings.getdict(‘REDIS_PARAMS‘))

3:
# 如果配置中redis_cls 等于 字符串
if isinstance(params.get(‘redis_cls‘), six.string_types):
params[‘redis_cls‘] = load_object(params[‘redis_cls‘])
return get_redis(**params)

由load_object
找到字符串类的路径,然后导入
def load_object(path):
try:
dot = path.rindex(‘.‘)
except ValueError:
raise ValueError("Error loading object ‘%s‘: not a full path" % path)
module, name = path[:dot], path[dot+1:]
mod = import_module(module)
try:
obj = getattr(mod, name) ---> 找到类
except AttributeError:
raise NameError("Module ‘%s‘ doesn‘t define any object named ‘%s‘" % (module, name))
return obj
3.1
找到之后return get_redis(**params)
def get_redis(**kwargs):
# 如果用户配置了就用用户的,没有用默认
redis_cls = kwargs.pop(‘redis_cls‘, defaults.REDIS_CLS)
url = kwargs.pop(‘url‘, None)
  # 这里表明url的优先级比较高
if url:
return redis_cls.from_url(url, **kwargs)
else:
# 相当于我们自己写的连接redisredis.Redis(host=‘xxx‘,port=‘‘)
return redis_cls(**kwargs)

4:类名加括号 --> 执行初始化 init
return cls(server, key=key, debug=debug)

def __init__(self, server, key, debug=False):
# self.server = redis连接
self.server = server
# self.key = dupefilter:%(timestamp)s
self.key = key
self.debug = debug
self.logdupes = True

 5:这里已经做了去重
def request_seen(self, request):

# 将我们的请求变成唯一标识
fp = self.request_fingerprint(request)
# self.server----> redis连接
# 添加到redis集合中 :1,添加成功 0;已经存在
added = self.server.sadd(self.key, fp)
return added == 0 # 0为True 1为False
 

 去重配置

源码里面没有用户信息,和连接redis的信息,所以需要自己添加配置

# ===========================连接redis信息
REDIS_HOST = 127.0.0.1                            # 主机名
REDIS_PORT = 6379                                   # 端口
REDIS_URL = redis://user:[email protected]:9001       # 连接URL(优先于以上配置)
REDIS_PARAMS  = {}                                  # Redis连接参数          默认:REDIS_PARAMS = {‘socket_timeout‘: 30,‘socket_connect_timeout‘: 30,‘retry_on_timeout‘: True,‘encoding‘: REDIS_ENCODING,})
REDIS_PARAMS[redis_cls] = myproject.RedisClient # 指定连接Redis的Python模块  默认:redis.StrictRedis
REDIS_ENCODING = "utf-8"                            # redis编码类型 

问题:

如何利用script-redis做去重规则?只需要添加配置文件,源码里面已经做好了

# ############ 连接redis 信息 #################
REDIS_HOST = 127.0.0.1                            # 主机名
REDIS_PORT = 6379                                   # 端口
# REDIS_URL = ‘redis://user:[email protected]:9001‘       # 连接URL(优先于以上配置)
REDIS_PARAMS  = {}                                  # Redis连接参数             默认:REDIS_PARAMS = {‘socket_timeout‘: 30,‘socket_connect_timeout‘: 30,‘retry_on_timeout‘: True,‘encoding‘: REDIS_ENCODING,})
# REDIS_PARAMS[‘redis_cls‘] = ‘myproject.RedisClient‘ # 指定连接Redis的Python模块  默认:redis.StrictRedis
REDIS_ENCODING = "utf-8"


# 自定义去重规则
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

想要自定义

如果想要对redis-scrapy的去重规则进行扩展:继承RFPDupeFilter
    from scrapy_redis.dupefilter import RFPDupeFilter
    class MyRFPDupeFilter(RFPDupeFilter):
        pass
        
        
    # 自定义去重规则  修改成自己类的路径
    DUPEFILTER_CLASS = "jassin03.dup.MyRFPDupeFilter"

redis基本操作

技术分享图片
# 2. 列表基本操作

# 在列表的前面进行插入
# conn.lpush(‘user_list‘,‘xianglong‘)
# conn.lpush(‘user_list‘,‘meikai‘)
# conn.lpush(‘user_list‘,‘dawei‘)

# 在列表的前面进行移除
# v = conn.lpop(‘user_list‘)
# print(v)
# 在列表的前面进行移除,如果列表中没有值:夯住
# v = conn.blpop(‘user_list‘)
# print(v)


# 在列表的后面进行插入
# conn.rpush(‘user_list‘,‘xinglong‘)
# conn.rpush(‘user_list‘,‘jinjie‘)

# 在列表的后面进行移除
# v = conn.rpop(‘user_list‘)
# print(v)

# 在列表的后面进行移除,如果列表中没有值:夯住
# v = conn.brpop(‘user_list‘)
# print(v)

# 有序集合
# conn.zadd(‘s8_score‘, ‘meikai‘, 60, ‘guotong‘, 30,‘liushuo‘,90)
# 根据分数从大到小排列,并获取最大的分值对应的数据
# val = conn.zrange(‘s8_score‘,0,0,desc=True)
# print(val)

# 根据分数从小到大排序,并获取分值最小对应的数据(并在redis中移除)
# pipe = conn.pipeline()
# pipe.multi()
# pipe.zrange(‘s8_score‘, 0, 0).zremrangebyrank(‘s8_score‘, 0, 0)
# results, count = pipe.execute()
# print(results,count)

# conn.lpush(‘xx‘,‘123‘)
# conn.lpop(‘xx‘)
# v = conn.keys()
# print(v)
# conn.flushall()
View Code
 
广度优先
后进先出(栈)
深度优先
先进先出(队列)

 scrapy-redis组件之三种队列

============queue.py   script-redis的队列源码===========
from scrapy.utils.reqser import request_to_dict, request_from_dict

from . import picklecompat


class Base(object):
    """Per-spider base queue class"""

    def __init__(self, server, spider, key, serializer=None):

        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, loads):
            raise TypeError("serializer does not implement ‘loads‘ function: %r"
                            % serializer)
        if not hasattr(serializer, dumps):
            raise TypeError("serializer ‘%s‘ does not implement ‘dumps‘ function: %r"
                            % serializer)

        self.server = server
        self.spider = spider
        self.key = key % {spider: spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)


class FifoQueue(Base):
    # 先进先出
    """Per-spider FIFO queue"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        # lpush在列表的前面放 左
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)


class PriorityQueue(Base):
    # 带优先级的队列
    """Per-spider priority queue abstraction using redis‘ sorted set"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        # We don‘t use zadd method as the order of arguments change depending on
        # whether the class is Redis or StrictRedis, and the option of using
        # kwargs only accepts strings, not bytes.
        self.server.execute_command(ZADD, self.key, score, data)

    def pop(self, timeout=0):
        """
        Pop a request
        timeout not support in this queue class
        """
        # use atomic range/remove using multi/exec
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])


class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)

# TODO: Deprecate the use of these names.
SpiderQueue = FifoQueue    # 先进先出的队列
SpiderStack = LifoQueue    # 后进先出的栈
SpiderPriorityQueue = PriorityQueue


1
    def _encode_request(self, request):
        """Encode a request object"""
        # 将请求转化为字典
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

由        return self.serializer.dumps(obj)
将cPickle 序列化
try:
    import cPickle as pickle  # PY2
except ImportError:
    import pickle


def loads(s):
    return pickle.loads(s)


def dumps(obj):
    return pickle.dumps(obj, protocol=-1)

代码

# -*- coding: utf-8 -*-
import scrapy
import scrapy_redis
from scrapy.http import Request
class ChuotiSpider(scrapy.Spider):
    name = chuoti
    allowed_domains = [chouti.com]
    start_urls = [http://chouti.com/]

    def parse(self, response):
        print(response,response.request.priority)
        obj = Request(url=https://dig.chouti.com/r/scoff/hot/1,callback=self.parse,dont_filter=True)
        obj.priority = response.request.priority + 1
        yield obj

技术分享图片

 

调度器

# 查调度器源码
from scrapy_redis.scheduler import Scheduler
# 有引擎来执行:自定义调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler

源码

class Scheduler(object):
    def __init__(self, server,
                 persist=False,
                 flush_on_start=False,
                 queue_key=defaults.SCHEDULER_QUEUE_KEY,
                 queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                 dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                 dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                 idle_before_close=0,
                 serializer=None):

        if idle_before_close < 0:
            raise TypeError("idle_before_close cannot be negative")

        self.server = server
        self.persist = persist
        self.flush_on_start = flush_on_start
        self.queue_key = queue_key
        self.queue_cls = queue_cls
        self.dupefilter_cls = dupefilter_cls
        self.dupefilter_key = dupefilter_key
        self.idle_before_close = idle_before_close
        self.serializer = serializer
        self.stats = None

    def __len__(self):
        return len(self.queue)

    @classmethod
    def from_settings(cls, settings):
        kwargs = {
            persist: settings.getbool(SCHEDULER_PERSIST),
            flush_on_start: settings.getbool(SCHEDULER_FLUSH_ON_START),
            idle_before_close: settings.getint(SCHEDULER_IDLE_BEFORE_CLOSE),
        }

        # If these values are missing, it means we want to use the defaults.
        optional = {
            # TODO: Use custom prefixes for this settings to note that are
            # specific to scrapy-redis.
            queue_key: SCHEDULER_QUEUE_KEY,
            queue_cls: SCHEDULER_QUEUE_CLASS,
            dupefilter_key: SCHEDULER_DUPEFILTER_KEY,
            # We use the default setting name to keep compatibility.
            dupefilter_cls: DUPEFILTER_CLASS,
            serializer: SCHEDULER_SERIALIZER,
        }
        for name, setting_name in optional.items():
            val = settings.get(setting_name)
            if val:
                kwargs[name] = val

        # Support serializer as a path to a module.
        if isinstance(kwargs.get(serializer), six.string_types):
            kwargs[serializer] = importlib.import_module(kwargs[serializer])

        server = connection.from_settings(settings)
        # Ensure the connection is working.
        server.ping()

        return cls(server=server, **kwargs)

    @classmethod
    def from_crawler(cls, crawler):
        instance = cls.from_settings(crawler.settings)
        # FIXME: for now, stats are only supported from this constructor
        instance.stats = crawler.stats
        return instance

    def open(self, spider):
        self.spider = spider

        try:
            self.queue = load_object(self.queue_cls)(
                server=self.server,
                spider=spider,
                key=self.queue_key % {spider: spider.name},
                serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class ‘%s‘: %s",
                             self.queue_cls, e)

        try:
            self.df = load_object(self.dupefilter_cls)(
                server=self.server,
                key=self.dupefilter_key % {spider: spider.name},
                debug=spider.settings.getbool(DUPEFILTER_DEBUG),
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class ‘%s‘: %s",
                             self.dupefilter_cls, e)

        if self.flush_on_start:
            self.flush()
        # notice if there are requests already in the queue to resume the crawl
        if len(self.queue):
            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

    def close(self, reason):
        if not self.persist:
            self.flush()

    def flush(self):
        self.df.clear()
        self.queue.clear()

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value(scheduler/enqueued/redis, spider=self.spider)
        self.queue.push(request)
        return True

    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value(scheduler/dequeued/redis, spider=self.spider)
        return request

    def has_pending_requests(self):
        return len(self) > 0


关键的两个方法
    # 关键enqueue_request当爬虫yield一个request就需要把它放到调取器
    def enqueue_request(self, request):
        # 判断是否访问过
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value(scheduler/enqueued/redis, spider=self.spider)
        self.queue.push(request)
        return True
    
    # next_request从调度器取出
    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value(scheduler/dequeued/redis, spider=self.spider)
        return request

配置文件

# 有引擎来执行:自定义调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

# 调度器配置
SCHEDULER_QUEUE_CLASS = scrapy_redis.queue.PriorityQueue  # 默认使用优先级队列(默认),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)
SCHEDULER_QUEUE_KEY = %(spider)s:requests  # 调度器中请求存放在redis中的key
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"  # 对保存到redis中的数据进行序列化,默认使用pickle
SCHEDULER_PERSIST = True  # 是否在关闭时候保留原来的调度器和去重记录,True=保留,False=清空
SCHEDULER_FLUSH_ON_START = True  # 是否在开始之前清空 调度器和去重记录,True=清空,False=不清空
SCHEDULER_IDLE_BEFORE_CLOSE = 10  # 去调度器中获取数据时,如果为空,最多等待时间(最后没数据,未获取到)。
SCHEDULER_DUPEFILTER_KEY = %(spider)s:dupefilter  # 去重规则,在redis中保存时对应的key 假如为chouti  --> chouti:dupefilter
SCHEDULER_DUPEFILTER_CLASS = scrapy_redis.dupefilter.RFPDupeFilter  # 去重规则对应处理的类

 

---恢复内容结束---

以上是关于scrapy-redis组件的主要内容,如果未能解决你的问题,请参考以下文章

Scrapy-redis 组件

python——scrapy-redis分布式组件

python高级之scrapy-redis

scrapy-redis

scrapy-redis组件

scrapy-redis使用以及剖析