使用redis+flask维护动态代理池
Posted 辛侠平
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用redis+flask维护动态代理池相关的知识,希望对你有一定的参考价值。
在进行网络爬虫时,会经常有封ip的现象。可以使用代理池来进行代理ip的处理。
代理池的要求:多站抓取,异步检测。定时筛选,持续更新。提供接口,易于提取。
代理池架构:获取器,过滤器,代理队列,定时检测。
使用https://github.com/Germey/ProxyPool/tree/master/proxypool代码进行分析。
run.py里面的代码
from proxypool.api import app from proxypool.schedule import Schedule def main(): s = Schedule() s.run() app.run() if __name__ == ‘__main__‘: main()
首先运行了一个调度器,接着运行了一个接口。
调度器schedule.py代码
import time from multiprocessing import Process import asyncio import aiohttp try: from aiohttp.errors import ProxyConnectionError,ServerDisconnectedError,ClientResponseError,ClientConnectorError except: from aiohttp import ClientProxyConnectionError as ProxyConnectionError,ServerDisconnectedError,ClientResponseError,ClientConnectorError from proxypool.db import RedisClient from proxypool.error import ResourceDepletionError from proxypool.getter import FreeProxyGetter from proxypool.setting import * from asyncio import TimeoutError class ValidityTester(object): test_api = TEST_API def __init__(self): self._raw_proxies = None self._usable_proxies = [] def set_raw_proxies(self, proxies): self._raw_proxies = proxies self._conn = RedisClient() async def test_single_proxy(self, proxy): """ text one proxy, if valid, put them to usable_proxies. """ try: async with aiohttp.ClientSession() as session: try: if isinstance(proxy, bytes): proxy = proxy.decode(‘utf-8‘) real_proxy = ‘http://‘ + proxy print(‘Testing‘, proxy) async with session.get(self.test_api, proxy=real_proxy, timeout=get_proxy_timeout) as response: if response.status == 200: self._conn.put(proxy) print(‘Valid proxy‘, proxy) except (ProxyConnectionError, TimeoutError, ValueError): print(‘Invalid proxy‘, proxy) except (ServerDisconnectedError, ClientResponseError,ClientConnectorError) as s: print(s) pass def test(self): """ aio test all proxies. """ print(‘ValidityTester is working‘) try: loop = asyncio.get_event_loop() tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies] loop.run_until_complete(asyncio.wait(tasks)) except ValueError: print(‘Async Error‘) class PoolAdder(object): """ add proxy to pool """ def __init__(self, threshold): self._threshold = threshold self._conn = RedisClient() self._tester = ValidityTester() self._crawler = FreeProxyGetter() def is_over_threshold(self): """ judge if count is overflow. """ if self._conn.queue_len >= self._threshold: return True else: return False def add_to_queue(self): print(‘PoolAdder is working‘) proxy_count = 0 while not self.is_over_threshold(): for callback_label in range(self._crawler.__CrawlFuncCount__): callback = self._crawler.__CrawlFunc__[callback_label] raw_proxies = self._crawler.get_raw_proxies(callback) # test crawled proxies self._tester.set_raw_proxies(raw_proxies) self._tester.test() proxy_count += len(raw_proxies) if self.is_over_threshold(): print(‘IP is enough, waiting to be used‘) break if proxy_count == 0: raise ResourceDepletionError class Schedule(object): @staticmethod def valid_proxy(cycle=VALID_CHECK_CYCLE): """ Get half of proxies which in redis """ conn = RedisClient() tester = ValidityTester() while True: print(‘Refreshing ip‘) count = int(0.5 * conn.queue_len) if count == 0: print(‘Waiting for adding‘) time.sleep(cycle) continue raw_proxies = conn.get(count) tester.set_raw_proxies(raw_proxies) tester.test() time.sleep(cycle) @staticmethod def check_pool(lower_threshold=POOL_LOWER_THRESHOLD, upper_threshold=POOL_UPPER_THRESHOLD, cycle=POOL_LEN_CHECK_CYCLE): """ If the number of proxies less than lower_threshold, add proxy """ conn = RedisClient() adder = PoolAdder(upper_threshold) while True: if conn.queue_len < lower_threshold: adder.add_to_queue() time.sleep(cycle) def run(self): print(‘Ip processing running‘) valid_process = Process(target=Schedule.valid_proxy) check_process = Process(target=Schedule.check_pool) valid_process.start() check_process.start()
调度器里面有个run方法,里面运行了两个进程。一个进程valid_process是从网上获取代理放入数据库;另外一个进程check_process是定时的从数据库拿出代理进行检测。
valid_proxy是定时检测器,里面传入一个时间的参数cycle=VALID_CHECK_CYCLE,定义定时检测的时间。方法里首先定义一个RedisClient()进行数据库的连接,该方法定义在db.py中
import redis from proxypool.error import PoolEmptyError from proxypool.setting import HOST, PORT, PASSWORD class RedisClient(object): def __init__(self, host=HOST, port=PORT): if PASSWORD: self._db = redis.Redis(host=host, port=port, password=PASSWORD) else: self._db = redis.Redis(host=host, port=port) def get(self, count=1): """ get proxies from redis """
#从左侧批量获取的方法 proxies = self._db.lrange("proxies", 0, count - 1)
self._db.ltrim("proxies", count, -1) return proxies def put(self, proxy): """ add proxy to right top """ self._db.rpush("proxies", proxy) def pop(self): """ get proxy from right. """ try: return self._db.rpop("proxies").decode(‘utf-8‘) except: raise PoolEmptyError @property def queue_len(self): """ get length from queue. """ return self._db.llen("proxies") def flush(self): """ flush db """ self._db.flushall() if __name__ == ‘__main__‘: conn = RedisClient() print(conn.pop())
接着还声明了ValidityTester(),用来检测代理是否可用,其中的test_single_proxy()方法是实现异步检测的关键。
check_pool()方法里面需要传入三个参数:两个代理池的上下界限,一个时间。里面有个PoolAdder的add_to_queue()方法。
add_to_queue()方法中使用了一个从网站抓取ip的类FreeProxyGetter(),在getter.py里面
from .utils import get_page from pyquery import PyQuery as pq import re class ProxyMetaclass(type): """ 元类,在FreeProxyGetter类中加入 __CrawlFunc__和__CrawlFuncCount__ 两个参数,分别表示爬虫函数,和爬虫函数的数量。 """ def __new__(cls, name, bases, attrs): count = 0 attrs[‘__CrawlFunc__‘] = [] for k, v in attrs.items(): if ‘crawl_‘ in k: attrs[‘__CrawlFunc__‘].append(k) count += 1 attrs[‘__CrawlFuncCount__‘] = count return type.__new__(cls, name, bases, attrs) class FreeProxyGetter(object, metaclass=ProxyMetaclass): def get_raw_proxies(self, callback): proxies = [] print(‘Callback‘, callback) for proxy in eval("self.{}()".format(callback)): print(‘Getting‘, proxy, ‘from‘, callback) proxies.append(proxy) return proxies def crawl_ip181(self): start_url = ‘http://www.ip181.com/‘ html = get_page(start_url) ip_adress = re.compile(‘<tr.*?>s*<td>(.*?)</td>s*<td>(.*?)</td>‘) # s* 匹配空格,起到换行作用 re_ip_adress = ip_adress.findall(html) for adress, port in re_ip_adress: result = adress + ‘:‘ + port yield result.replace(‘ ‘, ‘‘) def crawl_kuaidaili(self): for page in range(1, 4): # 国内高匿代理 start_url = ‘https://www.kuaidaili.com/free/inha/{}/‘.format(page) html = get_page(start_url) ip_adress = re.compile( ‘<td data-title="IP">(.*)</td>s*<td data-title="PORT">(w+)</td>‘ ) re_ip_adress = ip_adress.findall(html) for adress, port in re_ip_adress: result = adress + ‘:‘ + port yield result.replace(‘ ‘, ‘‘) def crawl_xicidaili(self): for page in range(1, 4): start_url = ‘http://www.xicidaili.com/wt/{}‘.format(page) html = get_page(start_url) ip_adress = re.compile( ‘<td class="country"><img src="http://fs.xicidaili.com/images/flag/cn.png" alt="Cn" /></td>s*<td>(.*?)</td>s*<td>(.*?)</td>‘ ) # s* 匹配空格,起到换行作用 re_ip_adress = ip_adress.findall(html) for adress, port in re_ip_adress: result = adress + ‘:‘ + port yield result.replace(‘ ‘, ‘‘) def crawl_daili66(self, page_count=4): start_url = ‘http://www.66ip.cn/{}.html‘ urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print(‘Crawling‘, url) html = get_page(url) if html: doc = pq(html) trs = doc(‘.containerbox table tr:gt(0)‘).items() for tr in trs: ip = tr.find(‘td:nth-child(1)‘).text() port = tr.find(‘td:nth-child(2)‘).text() yield ‘:‘.join([ip, port]) def crawl_data5u(self): for i in [‘gngn‘, ‘gnpt‘]: start_url = ‘http://www.data5u.com/free/{}/index.shtml‘.format(i) html = get_page(start_url) ip_adress = re.compile( ‘ <ul class="l2">s*<span><li>(.*?)</li></span>s*<span style="width: 100px;"><li class=".*">(.*?)</li></span>‘ ) # s * 匹配空格,起到换行作用 re_ip_adress = ip_adress.findall(html) for adress, port in re_ip_adress: result = adress + ‘:‘ + port yield result.replace(‘ ‘, ‘‘) def crawl_kxdaili(self): for i in range(1, 4): start_url = ‘http://www.kxdaili.com/ipList/{}.html#ip‘.format(i) html = get_page(start_url) ip_adress = re.compile(‘<tr.*?>s*<td>(.*?)</td>s*<td>(.*?)</td>‘) # s* 匹配空格,起到换行作用 re_ip_adress = ip_adress.findall(html) for adress, port in re_ip_adress: result = adress + ‘:‘ + port yield result.replace(‘ ‘, ‘‘) def crawl_premproxy(self): for i in [‘China-01‘, ‘China-02‘, ‘China-03‘, ‘China-04‘, ‘Taiwan-01‘]: start_url = ‘https://premproxy.com/proxy-by-country/{}.htm‘.format( i) html = get_page(start_url) if html: ip_adress = re.compile(‘<td data-label="IP:port ">(.*?)</td>‘) re_ip_adress = ip_adress.findall(html) for adress_port in re_ip_adress: yield adress_port.replace(‘ ‘, ‘‘) def crawl_xroxy(self): for i in [‘CN‘, ‘TW‘]: start_url = ‘http://www.xroxy.com/proxylist.php?country={}‘.format( i) html = get_page(start_url) if html: ip_adress1 = re.compile( "title=‘View this Proxy details‘>s*(.*).*") re_ip_adress1 = ip_adress1.findall(html) ip_adress2 = re.compile( "title=‘Select proxies with port number .*‘>(.*)</a>") re_ip_adress2 = ip_adress2.findall(html) for adress, port in zip(re_ip_adress1, re_ip_adress2): adress_port = adress + ‘:‘ + port yield adress_port.replace(‘ ‘, ‘‘)
具体使用方法可以看GitHub。。。。。。
以上是关于使用redis+flask维护动态代理池的主要内容,如果未能解决你的问题,请参考以下文章