爬虫入门第9课:实现代理池的检测模块
Posted 黑马程序员官方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了爬虫入门第9课:实现代理池的检测模块相关的知识,希望对你有一定的参考价值。
爬虫学习知识点及案例篇(汇总):
Python爬虫项目:Bilibili模拟登陆(滑动验证码)
本阶段带大家从代理池的设计开始,学习Python爬虫及项目实战,详情关注上方专栏 ↑↑↑
目的
: 检查代理IP可用性, 保证代理池中代理IP基本可用思路
- 在proxy_test.py中, 创建ProxyTester类
- 提供一个
run
方法, 用于处理检测代理IP核心逻辑- 从数据库中获取所有代理IP
- 遍历代理IP列表
- 检查代理可用性
- 如果代理不可用, 让代理分数-1, 如果代理分数等于0就从数据库中删除该代理, 否则更新该代理IP
- 如果代理可用, 就恢复该代理的分数, 更新到数据库中
- 为了提高检查的速度, 使用异步来执行检测任务
- 把要检测的代理IP, 放到队列中
- 把检查一个代理可用性的代码, 抽取到一个方法中; 从队列中获取代理IP, 进行检查; 检查完毕, 调度队列的task_done方法
- 通过异步回调, 使用死循环不断执行这个方法,
- 开启多个一个异步任务, 来处理代理IP的检测; 可以通过配置文件指定异步数量
- 使用
schedule
模块, 每隔一定的时间, 执行一次检测任务- 定义类方法
start
, 用于启动检测模块 - 在
start
方法中- 创建本类对象
- 调用run方法
- 每间隔一定时间, 执行一下, run方法
- 定义类方法
-
步骤
- 在proxy_test.py中, 创建ProxyTester类
-
提供一个
run
方法, 用于检查代理IP的可用性class ProxyTester(object): def __init__(self): self.proxy_pool = MongoPool() # 基于MongoDB的代理池 def run(self): # 1. 获取所有代理IP proxies = self.proxy_pool.find() # 2. 如果代理池为空, 直接返回 if proxies is None or len(proxies) == 0: print("代理池为空") return # 获取所有的代理, 放到队列中 for proxy in proxies: try: # 验证当前的代理 proxy = check_proxy(proxy) # 如果速度为-1就说明请求失败了 if proxy.speed == -1: # 代理的分数-1 proxy.score -= 1 # 如果分数为0, 就删除该代理 if proxy.score == 0: self.proxy_pool.delete(proxy) logger.info('删除代理:'.format(proxy)) else: # 如果分数不为0 ,就更新当前的代理 self.proxy_pool.update(proxy) else: # 如果请求成功了, 恢复为最高分数 proxy.score = settings.MAX_SCORE self.proxy_pool.update(proxy) except Exception as ex: logger.exception(ex)
-
为了提高检查的速度, 使用异步来执行检测任务
- 在初始化方法中
- 创建一个队列, 用于存储代理IP
- 创建协程池, 用于实现异步检查
- 修改
run
方法:- 从数据库中获取代理IP, 添加到队列中
- 把检查代理可用性的代码抽取一个方法;
- 在该方法中, 队列中取出一个代理来检查
- 检查完毕调用一下队列的task_done方法
- 使用协程池的异步来执行提取的方法, 但是只能检查一个代理IP;
- 通过异步回调, 实现死循环检查.
-
通过配置文件,配置异步数量
# settings.py文件中. # 检查代理IP的异步数量 TESTER_ANSYC_COUNT = 20 # run_test.py 文件中 ''' 代理检测类 ''' class ProxyTester(object): def __init__(self): self.proxy_pool = MongoPool() # 基于MongoDB的代理池 self.queue = Queue() self.pool = Pool() # 协程池 def _test_proxy(self): # 从代理队列中, 获取请求 proxy = self.queue.get() try: # 验证当前的代理 proxy = check_proxy(proxy) # 如果速度为-1就说明请求失败了 if proxy.speed == -1: # 代理的分数-1 proxy.score -= 1 # 如果分数为0, 就删除该代理 if proxy.score == 0: self.proxy_pool.delete(proxy) logger.info('删除代理:'.format(proxy)) else: # 如果分数不为0 ,就更新当前的代理 self.proxy_pool.update(proxy) else: # 如果请求成功了, 恢复为最高分数 proxy.score = settings.MAX_SCORE self.proxy_pool.update(proxy) except Exception as ex: logger.exception(ex) self.queue.task_done() def _test_proxy_finish(self, temp): self.pool.apply_async(self._test_proxy, callback=self._test_proxy_finish) def run(self): # 1. 获取所有代理IP proxies = self.proxy_pool.find() # 2. 如果代理池为空, 直接返回 if proxies is None or len(proxies) == 0: print("代理池为空") return # 获取所有的代理, 放到队列中 for proxy in proxies: self.queue.put(proxy) # 开启多个异步任务执行检查IP的任务 for i in range(settings.TESTER_ANSYC_COUNT): self.pool.apply_async(self._test_proxy,callback=self._test_proxy_finish) # 让主线程等待异步任务完成 self.queue.join()
- 在初始化方法中
- 每隔一定的时间, 执行一次爬取任务
- 修改
setting.py
文件, 检查代理IP可用性间隔时间的配置# # 检查可用IP的时间间隔, 单位分钟 TESTER_INTERVAL = 60
- 在
ProxyTester
中提供start的类方法, 用于启动爬虫的运行, 每间隔指定时间, 重新运行一次.@staticmethod def start(): tester = ProxyTester() tester.run() # 每隔2小时检查下代理是否可用 schedule.every(settings.TESTER_INTERVAL).hours.do(tester.run) while True: schedule.run_pending() time.sleep(1)
- 修改
-
完整代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import gevent.monkey
gevent.monkey.patch_all()
from gevent.pool import Pool
from queue import Queue
from db.mongo_pool import MongoPool
from validator.httpbin_validator import check_proxy
from utils.log import logger
import settings
import schedule
import time
'''
代理检测者
'''
class ProxyTester(object):
def __init__(self):
self.queue = Queue()
self.pool = Pool() # 协程池
self.proxy_pool = MongoPool() # 基于MongoDB的代理池
def _test_proxy(self):
# 从代理队列中, 获取请求
proxy = self.queue.get()
try:
# 验证当前的代理
proxy = check_proxy(proxy)
# 如果速度为-1就说明请求失败了
if proxy.speed == -1:
# 代理的分数-1
proxy.score -= 1
# 如果分数为0, 就删除该代理
if proxy.score == 0:
self.proxy_pool.delete(proxy)
logger.info('删除代理:'.format(proxy))
else:
# 如果分数不为0 ,就更新当前的代理
self.proxy_pool.update(proxy)
else:
# 如果请求成功了, 恢复为最高分数
proxy.score = settings.MAX_SCORE
self.proxy_pool.update(proxy)
except Exception as ex:
logger.exception(ex)
self.queue.task_done()
def _test_proxy_finish(self, temp):
self.pool.apply_async(self._test_proxy, callback=self._test_proxy_finish)
def run(self):
# 1. 获取所有代理IP
proxies = self.proxy_pool.find()
# 2. 如果代理池为空, 直接返回
if proxies is None or len(proxies) == 0:
print("代理池为空")
return
# 获取所有的代理, 放到队列中
for proxy in proxies:
self.queue.put(proxy)
# 开启多个异步任务执行检查IP的任务
for i in range(settings.TESTER_ANSYC_COUNT):
self.pool.apply_async(self._test_proxy,callback=self._test_proxy_finish)
# 让主线程等待异步任务完成
self.queue.join()
@staticmethod
def start():
tester = ProxyTester()
tester.run()
# 每隔2小时检查下代理是否可用
schedule.every(settings.TESTER_INTERVAL).hours.do(tester.run)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
ProxyTester.start()
小结
- 从数据库中获取所有代理IP
- 检查每一个代理IP的可用性
- 如果代理不可用, 让代理分数-1, 当分数为时, 从数据库中移除.
- 如果代理可用, 就恢复该代理的分数.
- 为了提高检查的速度, 使用异步来执行检测任务
以上是关于爬虫入门第9课:实现代理池的检测模块的主要内容,如果未能解决你的问题,请参考以下文章