scrapy_redis 相关: 多线程更新 score/request.priority

Posted my8100

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scrapy_redis 相关: 多线程更新 score/request.priority相关的知识,希望对你有一定的参考价值。

0.背景

使用 scrapy_redis 爬虫, 忘记或错误设置 request.priority(Rule 也可以通过参数 process_request 设置 request.priority),导致提取 item 的 request 排在有序集 xxx:requests 的队尾,持续占用内存。

 

1.代码实现

遍历 SortedSet 的所有 item 并根据预定义字典对 data 中的 url 进行正则匹配,更新 score 并复制到临时 newkey,最后执行 rename 

# -*- coding: UTF-8 -*
import sys
import re
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

try:
   input = raw_input #For py2
except NameError:
   pass

import redis


def print_line(string):
    print(\'\\n{symbol}{space}{string}\'.format(symbol=\'#\'*10, space=\' \'*5, string=string))


def check_key_scores(key):
    try:
        total = redis_server.zcard(key)
    except redis.exceptions.ResponseError:
        print("The value of \'{key}\' is not a SortedSet".format(key=key))
        sys.exit()
    except Exception as err:
        print(err)
        sys.exit()

    if total == 0:
        print("key \'{key}\' does not exist or has no items".format(key=key))
        sys.exit()

    __, min_score = redis_server.zrange(key, 0, 0, withscores=True)[0]
    __, max_score = redis_server.zrange(key, -1, -1, withscores=True)[0]

    print(\'score  amount\')
    total_ = 0
    # Asuming that score/request.priority is an integer, rather than float number like 1.1
    for score in range(int(min_score), int(max_score)+1):
        count = redis_server.zcount(key, score, score)
        print(score, count)
        total_ += count
    print("{total_}/{total} items of key \'{key}\' have an integer priority".format(
            total_=total_, total=total_, key=key))


def zadd_with_new_score(startstop, total_items):
    data, ori_score = redis_server.zrange(key, startstop, startstop, withscores=True)[0]
    for pattern, score in pattern_score:
        # data eg: b\'\\\\x80\\\\x02}q\\\\x00(X\\\\x03\\\\x00\\\\x00\\\\x00urlq\\\\x01X\\\\x13\\\\x00\\\\x00\\\\x00http://httpbin.org/q\\\\x02X\\\\x08\\\\x00\\\\x00\\\\x00callbackq\\\\x03X\\\\x
        # See /site-packages/scrapy_redis/queue.py
            # We don\'t use zadd method as the order of arguments change depending on
            # whether the class is Redis or StrictRedis, and the option of using
            # kwargs only accepts strings, not bytes.
        m = pattern.search(data.decode(\'utf-8\', \'replace\'))
        if m:
            redis_server.execute_command(\'ZADD\', newkey, score, data)
            break
    else:
        redis_server.execute_command(\'ZADD\', newkey, ori_score, data)
    print(\'{startstop} / {total_items}\'.format(
            startstop=startstop+1, total_items=total_items))


if __name__ == \'__main__\':

    password = \'password\'
    host = \'127.0.0.1\'
    port = \'6379\'
    database_num = 0

    key = \'test:requests\'
    newkey = \'temp\'
    # Request whose url matching any key of keyword_score would be updated with the corresponding value as its score
    # Smaller value/score means higher request.priority
    keyword_score = {\'httpbin\': -12, \'apps/details\': 1}
    pattern_score = [(re.compile(r\'url.*?%s.*?callback\'%k), v)for (k, v) in keyword_score.items()]
    
    threads_amount = 10

    redis_server = redis.StrictRedis.from_url(\'redis://:{password}@{host}:{port}/{database_num}\'.format(
                                                password=password, host=host,
                                                port=port, database_num=database_num))


    print_line(\'Step 0: pre check\')
    check_key_scores(key)


    print_line(\'Step 1: copy items and update score\')
    # total_items = redis_server.zlexcount(key, \'-\', \'+\')
    total_items = redis_server.zcard(key)
    input("Press Enter to copy {total_items} items of \'{key}\' into \'{newkey}\' with new score".format(
            total_items=total_items, key=key, newkey=newkey))
    p = ThreadPool(threads_amount)
    p.map(partial(zadd_with_new_score, total_items=total_items), range(total_items))
    p.close()   #Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
    p.join()    #Wait for the worker processes to exit. One must call close() or terminate() before using join().

    # For py3
    # https://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments
    # with ThreadPool(threads_amount) as pool:
        # pool.map(partial(zadd_with_new_score, total_items=total_items), range(total_items))
    # print(\'zadd_with_new_score done\')


    print_line(\'Step 2: check copy result\')
    check_key_scores(key)
    check_key_scores(newkey)


    print_line(\'Step 3: delete, rename and check key\')
    input("Press Enter to DELETE \'{key}\' and RENAME \'{newkey}\' to \'{key}\'".format(
            key=key, newkey=newkey))
    print(redis_server.delete(key))
    print(redis_server.rename(newkey, key))
    check_key_scores(key)
    check_key_scores(newkey)

 

 

2.运行结果

 

以上是关于scrapy_redis 相关: 多线程更新 score/request.priority的主要内容,如果未能解决你的问题,请参考以下文章

[置顶]使用scrapy_redis,自动实时增量更新东方头条网全站新闻

scrapy 和 scrapy_redis 安装

多线程更新UI的常用方法

scrapy_redis去重优化(已有7亿条数据),附Demo福利

C#关于定时器和多线程中对控件的操作以及界面假死的现象。

UNIX-线程(下)线程控制