scrapy_redis 相关: 多线程更新 score/request.priority
Posted my8100
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scrapy_redis 相关: 多线程更新 score/request.priority相关的知识,希望对你有一定的参考价值。
0.背景
使用 scrapy_redis 爬虫, 忘记或错误设置 request.priority(Rule 也可以通过参数 process_request 设置 request.priority),导致提取 item 的 request 排在有序集 xxx:requests 的队尾,持续占用内存。
1.代码实现
遍历 SortedSet 的所有 item 并根据预定义字典对 data 中的 url 进行正则匹配,更新 score 并复制到临时 newkey,最后执行 rename
# -*- coding: UTF-8 -* import sys import re from multiprocessing.dummy import Pool as ThreadPool from functools import partial try: input = raw_input #For py2 except NameError: pass import redis def print_line(string): print(\'\\n{symbol}{space}{string}\'.format(symbol=\'#\'*10, space=\' \'*5, string=string)) def check_key_scores(key): try: total = redis_server.zcard(key) except redis.exceptions.ResponseError: print("The value of \'{key}\' is not a SortedSet".format(key=key)) sys.exit() except Exception as err: print(err) sys.exit() if total == 0: print("key \'{key}\' does not exist or has no items".format(key=key)) sys.exit() __, min_score = redis_server.zrange(key, 0, 0, withscores=True)[0] __, max_score = redis_server.zrange(key, -1, -1, withscores=True)[0] print(\'score amount\') total_ = 0 # Asuming that score/request.priority is an integer, rather than float number like 1.1 for score in range(int(min_score), int(max_score)+1): count = redis_server.zcount(key, score, score) print(score, count) total_ += count print("{total_}/{total} items of key \'{key}\' have an integer priority".format( total_=total_, total=total_, key=key)) def zadd_with_new_score(startstop, total_items): data, ori_score = redis_server.zrange(key, startstop, startstop, withscores=True)[0] for pattern, score in pattern_score: # data eg: b\'\\\\x80\\\\x02}q\\\\x00(X\\\\x03\\\\x00\\\\x00\\\\x00urlq\\\\x01X\\\\x13\\\\x00\\\\x00\\\\x00http://httpbin.org/q\\\\x02X\\\\x08\\\\x00\\\\x00\\\\x00callbackq\\\\x03X\\\\x # See /site-packages/scrapy_redis/queue.py # We don\'t use zadd method as the order of arguments change depending on # whether the class is Redis or StrictRedis, and the option of using # kwargs only accepts strings, not bytes. m = pattern.search(data.decode(\'utf-8\', \'replace\')) if m: redis_server.execute_command(\'ZADD\', newkey, score, data) break else: redis_server.execute_command(\'ZADD\', newkey, ori_score, data) print(\'{startstop} / {total_items}\'.format( startstop=startstop+1, total_items=total_items)) if __name__ == \'__main__\': password = \'password\' host = \'127.0.0.1\' port = \'6379\' database_num = 0 key = \'test:requests\' newkey = \'temp\' # Request whose url matching any key of keyword_score would be updated with the corresponding value as its score # Smaller value/score means higher request.priority keyword_score = {\'httpbin\': -12, \'apps/details\': 1} pattern_score = [(re.compile(r\'url.*?%s.*?callback\'%k), v)for (k, v) in keyword_score.items()] threads_amount = 10 redis_server = redis.StrictRedis.from_url(\'redis://:{password}@{host}:{port}/{database_num}\'.format( password=password, host=host, port=port, database_num=database_num)) print_line(\'Step 0: pre check\') check_key_scores(key) print_line(\'Step 1: copy items and update score\') # total_items = redis_server.zlexcount(key, \'-\', \'+\') total_items = redis_server.zcard(key) input("Press Enter to copy {total_items} items of \'{key}\' into \'{newkey}\' with new score".format( total_items=total_items, key=key, newkey=newkey)) p = ThreadPool(threads_amount) p.map(partial(zadd_with_new_score, total_items=total_items), range(total_items)) p.close() #Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit. p.join() #Wait for the worker processes to exit. One must call close() or terminate() before using join(). # For py3 # https://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments # with ThreadPool(threads_amount) as pool: # pool.map(partial(zadd_with_new_score, total_items=total_items), range(total_items)) # print(\'zadd_with_new_score done\') print_line(\'Step 2: check copy result\') check_key_scores(key) check_key_scores(newkey) print_line(\'Step 3: delete, rename and check key\') input("Press Enter to DELETE \'{key}\' and RENAME \'{newkey}\' to \'{key}\'".format( key=key, newkey=newkey)) print(redis_server.delete(key)) print(redis_server.rename(newkey, key)) check_key_scores(key) check_key_scores(newkey)
2.运行结果
以上是关于scrapy_redis 相关: 多线程更新 score/request.priority的主要内容,如果未能解决你的问题,请参考以下文章
[置顶]使用scrapy_redis,自动实时增量更新东方头条网全站新闻