python爬虫-代理池的维护

Posted nikecode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python爬虫-代理池的维护相关的知识,希望对你有一定的参考价值。

简介

我们可以从网上或者付费获取大量代理,但是这其中很多依然不可用,那么搭建高效的代理池,对代理ip进行筛选是十分必要的

 

准备工作

安装Redis数据库,还需要安装aiohttp、requests、redis-py、pyquery、Flask库,安装流程请百度自行查询

由于文件内容较多,所以就不一一讲解了,直接创建一个Python Package模块包,下次直接调用

 

创建一个Python Package包,取名为proxypool

 

一.创建一个setting.py文件,用于存放配置信息。代码如下:

# Redis数据库地址
REDIS_HOST = 127.0.0.1

# Redis端口
REDIS_PORT = 6379

# Redis密码,如无填None
REDIS_PASSWORD = None

REDIS_KEY = proxies

# 代理分数
MAX_SCORE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10

VALID_STATUS_CODES = [200, 302]

# 代理池数量界限
POOL_UPPER_THRESHOLD = 50000

# 检查周期
TESTER_CYCLE = 20
# 获取周期
GETTER_CYCLE = 300

# 测试API,建议抓哪个网站测哪个
TEST_URL = http://www.baidu.com

# API配置
API_HOST = 0.0.0.0
API_PORT = 5555

 

二.创建error.py文件,处理代理池枯竭情况

class PoolEmptyError(Exception):

    def __init__(self):
        Exception.__init__(self)

    def __str__(self):
        return repr(代理池已经枯竭)

 

三.创建存储模块,用于进行代理ip的存储,删除,更新等等操作

import redis
from proxypool.error import PoolEmptyError
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY
from proxypool.setting import MAX_SCORE, MIN_SCORE, INITIAL_SCORE
from random import choice
import re


class RedisClient(object):
    def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
        """
        初始化
        :param host: Redis 地址
        :param port: Redis 端口
        :param password: Redis密码
        """
        self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)

    def add(self, proxy, score=INITIAL_SCORE):
        """
        添加代理,设置分数为最高
        :param proxy: 代理
        :param score: 分数
        :return: 添加结果
        """
        if not re.match(\d+\.\d+\.\d+\.\d+\:\d+, proxy):
            print(代理不符合规范, proxy, 丢弃)
            return
        if not self.db.zscore(REDIS_KEY, proxy):
            return self.db.zadd(REDIS_KEY, score, proxy)

    def random(self):
        """
        随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常
        :return: 随机代理
        """
        result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
        if len(result):
            return choice(result)
        else:
            result = self.db.zrevrange(REDIS_KEY, 0, 100)
            if len(result):
                return choice(result)
            else:
                raise PoolEmptyError

    def decrease(self, proxy):
        """
        代理值减一分,小于最小值则删除
        :param proxy: 代理
        :return: 修改后的代理分数
        """
        score = self.db.zscore(REDIS_KEY, proxy)
        if score and score > MIN_SCORE:
            print(代理, proxy, 当前分数, score, 减1)
            return self.db.zincrby(REDIS_KEY, proxy, -1)
        else:
            print(代理, proxy, 当前分数, score, 移除)
            return self.db.zrem(REDIS_KEY, proxy)

    def exists(self, proxy):
        """
        判断是否存在
        :param proxy: 代理
        :return: 是否存在
        """
        return not self.db.zscore(REDIS_KEY, proxy) == None

    def max(self, proxy):
        """
        将代理设置为MAX_SCORE
        :param proxy: 代理
        :return: 设置结果
        """
        print(代理, proxy, 可用,设置为, MAX_SCORE)
        return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)

    def count(self):
        """
        获取数量
        :return: 数量
        """
        return self.db.zcard(REDIS_KEY)

    def all(self):
        """
        获取全部代理
        :return: 全部代理列表
        """
        return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)

    def batch(self, start, stop):
        """
        批量获取
        :param start: 开始索引
        :param stop: 结束索引
        :return: 代理列表
        """
        return self.db.zrevrange(REDIS_KEY, start, stop - 1)


if __name__ == __main__:
    conn = RedisClient()
    result = conn.batch(680, 688)
    print(result)

 

四.创建一个importer.py文件,用于录入ip代理:

from proxypool.db import RedisClient

conn = RedisClient()


def set(proxy):
    result = conn.add(proxy)
    print(proxy)
    print(录入成功 if result else 录入失败)


def scan():
    print(请输入代理, 输入exit退出读入)
    while True:
        proxy = input()
        if proxy == exit:
            break
        set(proxy)


if __name__ == __main__:
    scan()

 

五.创建utils.py文件,用于抓取网页,代码如下:

import requests
from requests.exceptions import ConnectionError

base_headers = 
    User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/54.0.2840.71 Safari/537.36,
    Accept-Encoding: gzip, deflate, sdch,
    Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7



def get_page(url, options=):
    """
    抓取代理
    :param url:
    :param options:
    :return:
    """
    headers = dict(base_headers, **options)
    print(正在抓取, url)
    try:
        response = requests.get(url, headers=headers)
        print(抓取成功, url, response.status_code)
        if response.status_code == 200:
            return response.text
    except ConnectionError:
        print(抓取失败, url)
        return None

 

六.获取模块,创建crawler.py文件,用于从各大网站抓取代理

import json
import re
from .utils import get_page
from pyquery import PyQuery as pq


class ProxyMetaclass(type):
    def __new__(cls, name, bases, attrs):
        count = 0
        attrs[__CrawlFunc__] = []
        for k, v in attrs.items():
            if crawl_ in k:
                attrs[__CrawlFunc__].append(k)
                count += 1
        attrs[__CrawlFuncCount__] = count
        return type.__new__(cls, name, bases, attrs)


class Crawler(object, metaclass=ProxyMetaclass):
    def get_proxies(self, callback):
        proxies = []
        for proxy in eval("self.()".format(callback)):
            print(成功获取到代理, proxy)
            proxies.append(proxy)
        return proxies

    def crawl_daili66(self, page_count=4):
        """
        获取代理66
        :param page_count: 页码
        :return: 代理
        """
        start_url = http://www.66ip.cn/.html
        urls = [start_url.format(page) for page in range(1, page_count + 1)]
        for url in urls:
            print(Crawling, url)
            html = get_page(url)
            if html:
                doc = pq(html)
                trs = doc(.containerbox table tr:gt(0)).items()
                for tr in trs:
                    ip = tr.find(td:nth-child(1)).text()
                    port = tr.find(td:nth-child(2)).text()
                    yield :.join([ip, port])

    def crawl_ip3366(self):
        for page in range(1, 4):
            start_url = http://www.ip3366.net/free/?stype=1&page=.format(page)
            html = get_page(start_url)
            ip_address = re.compile(<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>)
            # \s * 匹配空格,起到换行作用
            re_ip_address = ip_address.findall(html)
            for address, port in re_ip_address:
                result = address + : + port
                yield result.replace( , ‘‘)

    def crawl_kuaidaili(self):
        for i in range(1, 4):
            start_url = http://www.kuaidaili.com/free/inha//.format(i)
            html = get_page(start_url)
            if html:
                ip_address = re.compile(<td data-title="IP">(.*?)</td>)
                re_ip_address = ip_address.findall(html)
                port = re.compile(<td data-title="PORT">(.*?)</td>)
                re_port = port.findall(html)
                for address, port in zip(re_ip_address, re_port):
                    address_port = address + : + port
                    yield address_port.replace( , ‘‘)

    def crawl_xicidaili(self):
        for i in range(1, 3):
            start_url = http://www.xicidaili.com/nn/.format(i)
            headers = 
                Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,
                Cookie: _free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3,
                Host: www.xicidaili.com,
                Referer: http://www.xicidaili.com/nn/3,
                Upgrade-Insecure-Requests: 1,
            
            html = get_page(start_url, options=headers)
            if html:
                find_trs = re.compile(<tr class.*?>(.*?)</tr>, re.S)
                trs = find_trs.findall(html)
                for tr in trs:
                    find_ip = re.compile(<td>(\d+\.\d+\.\d+\.\d+)</td>)
                    re_ip_address = find_ip.findall(tr)
                    find_port = re.compile(<td>(\d+)</td>)
                    re_port = find_port.findall(tr)
                    for address, port in zip(re_ip_address, re_port):
                        address_port = address + : + port
                        yield address_port.replace( , ‘‘)

    def crawl_ip3366(self):
        for i in range(1, 4):
            start_url = http://www.ip3366.net/?stype=1&page=.format(i)
            html = get_page(start_url)
            if html:
                find_tr = re.compile(<tr>(.*?)</tr>, re.S)
                trs = find_tr.findall(html)
                for s in range(1, len(trs)):
                    find_ip = re.compile(<td>(\d+\.\d+\.\d+\.\d+)</td>)
                    re_ip_address = find_ip.findall(trs[s])
                    find_port = re.compile(<td>(\d+)</td>)
                    re_port = find_port.findall(trs[s])
                    for address, port in zip(re_ip_address, re_port):
                        address_port = address + : + port
                        yield address_port.replace( , ‘‘)

    def crawl_iphai(self):
        start_url = http://www.iphai.com/
        html = get_page(start_url)
        if html:
            find_tr = re.compile(<tr>(.*?)</tr>, re.S)
            trs = find_tr.findall(html)
            for s in range(1, len(trs)):
                find_ip = re.compile(<td>\s+(\d+\.\d+\.\d+\.\d+)\s+</td>, re.S)
                re_ip_address = find_ip.findall(trs[s])
                find_port = re.compile(<td>\s+(\d+)\s+</td>, re.S)
                re_port = find_port.findall(trs[s])
                for address, port in zip(re_ip_address, re_port):
                    address_port = address + : + port
                    yield address_port.replace( , ‘‘)

    def crawl_data5u(self):
        start_url = http://www.data5u.com/free/gngn/index.shtml
        headers = 
            Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,
            Accept-Encoding: gzip, deflate,
            Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,
            Cache-Control: max-age=0,
            Connection: keep-alive,
            Cookie: JSESSIONID=47AA0C887112A2D83EE040405F837A86,
            Host: www.data5u.com,
            Referer: http://www.data5u.com/free/index.shtml,
            Upgrade-Insecure-Requests: 1,
            User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36,
        
        html = get_page(start_url, options=headers)
        if html:
            ip_address = re.compile(<span><li>(\d+\.\d+\.\d+\.\d+)</li>.*?<li class=\"port.*?>(\d+)</li>, re.S)
            re_ip_address = ip_address.findall(html)
            for address, port in re_ip_address:
                result = address + : + port
                yield result.replace( , ‘‘)

 

七.创建getter.py文件,用于动态的调用所有以crawl开头的方法,获取抓取到的代理,存储到数据库中

from proxypool.tester import Tester
from proxypool.db import RedisClient
from proxypool.crawler import Crawler
from proxypool.setting import *
import sys


class Getter():
    def __init__(self):
        self.redis = RedisClient()
        self.crawler = Crawler()

    def is_over_threshold(self):
        """
        判断是否达到了代理池限制
        """
        if self.redis.count() >= POOL_UPPER_THRESHOLD:
            return True
        else:
            return False

    def run(self):
        print(获取器开始执行)
        if not self.is_over_threshold():
            for callback_label in range(self.crawler.__CrawlFuncCount__):
                callback = self.crawler.__CrawlFunc__[callback_label]
                # 获取代理
                proxies = self.crawler.get_proxies(callback)
                sys.stdout.flush()
                for proxy in proxies:
                    self.redis.add(proxy)

 

八.检测模块:创建tester.py文件,测试ip代理

import asyncio
import aiohttp
import time
import sys

try:
    from aiohttp import ClientError
except:
    from aiohttp import ClientProxyConnectionError as ProxyConnectionError
from proxypool.db import RedisClient
from proxypool.setting import *


class Tester(object):
    def __init__(self):
        self.redis = RedisClient()

    async def test_single_proxy(self, proxy):
        """
        测试单个代理
        :param proxy:
        :return:
        """
        conn = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=conn) as session:
            try:
                if isinstance(proxy, bytes):
                    proxy = proxy.decode(utf-8)
                real_proxy = http:// + proxy
                print(正在测试, proxy)
                async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
                    if response.status in VALID_STATUS_CODES:
                        self.redis.max(proxy)
                        print(代理可用, proxy)
                    else:
                        self.redis.decrease(proxy)
                        print(请求响应码不合法 , response.status, IP, proxy)
            except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):
                self.redis.decrease(proxy)
                print(代理请求失败, proxy)

    def run(self):
        """
        测试主函数
        :return:
        """
        print(测试器开始运行)
        try:
            count = self.redis.count()
            print(当前剩余, count, 个代理)
            for i in range(0, count, BATCH_TEST_SIZE):
                start = i
                stop = min(i + BATCH_TEST_SIZE, count)
                print(正在测试第, start + 1, -, stop, 个代理)
                test_proxies = self.redis.batch(start, stop)
                loop = asyncio.get_event_loop()
                tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
                sys.stdout.flush()
                time.sleep(5)
        except Exception as e:
            print(测试器发生错误, e.args)

 

九.接口模块:用于对接数据库,创建api.py文件

from flask import Flask, g

from .db import RedisClient

__all__ = [app]

app = Flask(__name__)


def get_conn():
    if not hasattr(g, redis):
        g.redis = RedisClient()
    return g.redis


@app.route(/)
def index():
    return <h2>Welcome to Proxy Pool System</h2>


@app.route(/random)
def get_proxy():
    """
    Get a proxy
    :return: 随机代理
    """
    conn = get_conn()
    return conn.random()


@app.route(/count)
def get_counts():
    """
    Get the count of proxies
    :return: 代理池总量
    """
    conn = get_conn()
    return str(conn.count())


if __name__ == __main__:
    app.run()

十.调度模块:将以上三大模块通过多进程的形式运行起来,创建scheduler.py文件

import time
from multiprocessing import Process
from proxypool.api import app
from proxypool.getter import Getter
from proxypool.tester import Tester
from proxypool.db import RedisClient
from proxypool.setting import *


class Scheduler():
    def schedule_tester(self, cycle=TESTER_CYCLE):
        """
        定时测试代理
        """
        tester = Tester()
        while True:
            print(测试器开始运行)
            tester.run()
            time.sleep(cycle)

    def schedule_getter(self, cycle=GETTER_CYCLE):
        """
        定时获取代理
        """
        getter = Getter()
        while True:
            print(开始抓取代理)
            getter.run()
            time.sleep(cycle)

    def schedule_api(self):
        """
        开启API
        """
        app.run(API_HOST, API_PORT)

    def run(self):
        print(代理池开始运行)

        if TESTER_ENABLED:
            tester_process = Process(target=self.schedule_tester)
            tester_process.start()

        if GETTER_ENABLED:
            getter_process = Process(target=self.schedule_getter)
            getter_process.start()

        if API_ENABLED:
            api_process = Process(target=self.schedule_api)
            api_process.start()

 

运行

调用scheduler的run()方法就能启动整个代理池

然后打开浏览器http://127.0.0.1:5555可以看到首页

访问http://127.0.0.1:5555/random,可以获得随机可用代理

 

获取代码如下,并且使用网站http://httpbin.org/get进行模拟操作

import requests

PROXY_POOL_URL = "http://127.0.0.1:5555/random"

def get_proxy():
    try:
        response = requests.get(PROXY_POOL_URL)
        if response.status_code == 200:
            return response.text
    except requests.exceptions.ConnectionError:
        #print("没有获取到代理ip,请检查代理池内部")
        return None

#---------------------------------------------------------
proxy = get_proxy()
proxies = 
    "http":"http://"+proxy,
    "https":"https://"+proxy,

try:
    response = requests.get("http://httpbin.org/get",proxies=proxies)
    print(response.text)
except requests.exceptions.ConnectionError as e:
    print("Error",e.args)

 



 

 

付费代理

不同的付费代理各不相同,但是基本规则相差不大,比较常见的有讯代理,阿布云代理,大象代理等等

下面以大象代理为例,获取到代理接口链接,可以直接使用其中的代理ip,也可以将其放入代理池进行检验,放到crawler.py中,下次只需要重新添加链接即可使用

 

以上是关于python爬虫-代理池的维护的主要内容,如果未能解决你的问题,请参考以下文章

Python爬虫实战-基于代理池的高并发爬虫

Python_01_IP代理池_实现代理池的爬虫模块的及具体爬虫

python 爬虫 ip池怎么做

Python_01_IP代理池_实现代理池的爬虫模块的执行方法

爬虫技术:代理池的维护

爬虫入门第2课:代理池的设计